Source: chunked-uploader.ts

/**
 * @fileoverview Upload manager for large file uploads
 */

import { Promise } from 'bluebird';

// -----------------------------------------------------------------------------
// Typedefs
// -----------------------------------------------------------------------------

/**
 * Chunk uploaded event
 * @event Chunk#uploaded
 * @param {UploadPart} data The data of the uploaded chunk
 * @private
 */

/**
 * Chunk error event
 * @event Chunk#error
 * @param {Error} err The error that occurred
 * @private
 */

/**
 * Event for when the upload is successfully aborted
 * @event ChunkedUploader#aborted
 */

/**
 * Event for when the abort fails because the upload session is not destroyed.
 * In general, the abort can be retried, and no new chunks will be uploaded.
 * @event ChunkedUploader#abortFailed
 * @param {Error} err The error that occurred
 */

/**
 * Event for when a chunk fails to upload.  Note that the chunk will automatically
 * retry until it is successfully uploaded.
 * @event ChunkedUploader#chunkError
 * @param {Error} err The error that occurred during chunk upload
 */

/**
 * Event for when a chunk is successfully uploaded
 * @event ChunkedUploader#chunkUploaded
 * @param {UploadPart} data The data for the uploaded chunk
 */

/**
 * Event for when the entire upload is complete
 * @event ChunkedUploader#uploadComplete
 * @param {Object} file The file object for the newly-uploaded file
 */

/**
 * Event for when an upload fails
 * @event ChunkedUploader#error
 * @param {Error} err The error that occurred
 */

type ChunkedUploaderOptions = {
  retryInterval?: number;
  parallelism?: number;
  fileAttributes?: Record<string, any>;
};

type UploadSessionInfo = {
  id: string;
  part_size: number;
};

// -----------------------------------------------------------------------------
// Requirements
// -----------------------------------------------------------------------------

import { EventEmitter } from 'events';
import { Readable as ReadableStream } from 'stream';
import crypto from 'crypto';
import BoxClient from './box-client';

// -----------------------------------------------------------------------------
// Private
// -----------------------------------------------------------------------------

const DEFAULT_OPTIONS = Object.freeze({
  parallelism: 4,
  retryInterval: 1000,
});

/**
 * Chunk of a file to be uploaded, which handles trying to upload itself until
 * it succeeds.
 * @private
 */
class Chunk extends EventEmitter {
  client: BoxClient;
  sessionID: string;
  chunk: Buffer | string | null;
  length: number;
  offset: number;
  totalSize: number;
  options: ChunkedUploaderOptions;
  data: any /* FIXME */;
  retry: number | NodeJS.Timeout | null;
  canceled: boolean;

  /**
   * Create a Chunk, representing a part of a file being uploaded
   * @param {BoxClient} client The Box SDK client
   * @param {string} sessionID The ID of the upload session the chunk belongs to
   * @param {Buffer|string} chunk The chunk that was uploaded
   * @param {int} offset The byte offset within the file where this chunk begins
   * @param {int} totalSize The total size of the file this chunk belongs to
   * @param {Object} options The options from the ChunkedUploader
   * @param {int} options.retryInterval The number of ms to wait before retrying a chunk upload
   */
  constructor(
    client: BoxClient,
    sessionID: string,
    chunk: Buffer | string,
    offset: number,
    totalSize: number,
    options: ChunkedUploaderOptions
  ) {
    super();

    this.client = client;
    this.sessionID = sessionID;
    this.chunk = chunk;
    this.length = chunk.length;
    this.offset = offset;
    this.totalSize = totalSize;
    this.options = options;
    this.data = null;
    this.retry = null;
    this.canceled = false;
  }

  /**
   * Get the final object representation of this chunk for the API
   * @returns {UploadPart} The chunk object
   */
  getData() {
    return this.data.part;
  }

  /**
   * Upload a chunk to the API
   * @returns {void}
   * @emits Chunk#uploaded
   * @emits Chunk#error
   */
  upload() {
    this.client.files.uploadPart(
      this.sessionID,
      this.chunk!,
      this.offset,
      this.totalSize,
      (err: any /* FIXME */, data: any /* FIXME */) => {
        if (this.canceled) {
          this.chunk = null;
          return;
        }

        if (err) {
          // handle the error or retry
          if (err.statusCode) {
            // an API error, probably not retryable!
            this.emit('error', err);
          } else {
            // maybe a network error, retry
            this.retry = setTimeout(
              () => this.upload(),
              this.options.retryInterval
            );
          }
          return;
        }

        // Record the chunk data for commit, and try to free up the chunk buffer
        this.data = data;
        this.chunk = null;
        this.emit('uploaded', data);
      }
    );
  }

  /**
   * Cancel trying to upload a chunk, preventing it from retrying and clearing
   * the associated buffer
   * @returns {void}
   */
  cancel() {
    clearTimeout(this.retry as any); // number or NodeJS.Timeout
    this.chunk = null;
    this.canceled = true;
  }
}

// -----------------------------------------------------------------------------
// Public
// -----------------------------------------------------------------------------

/** Manager for uploading a file in chunks */
class ChunkedUploader extends EventEmitter {
  _client: BoxClient;
  _sessionID: string;
  _partSize: number;
  _uploadSessionInfo: UploadSessionInfo;
  _stream!: ReadableStream | null;
  _streamBuffer!: Array<any>;
  _file!: Buffer | string | null;
  _size: number;
  _options: Required<ChunkedUploaderOptions>;
  _isStarted: boolean;
  _numChunksInFlight: number;
  _chunks: Array<any>;
  _position: number;
  _fileHash: crypto.Hash;
  _promise?: Promise<any>;
  _resolve?: Function;
  _reject?: Function;

  /**
   * Create an upload manager
   * @param {BoxClient} client The client to use to upload the file
   * @param {Object} uploadSessionInfo The upload session info to use for chunked upload
   * @param {ReadableStream|Buffer|string} file The file to upload
   * @param {int} size The size of the file to be uploaded
   * @param {Object} [options] Optional parameters
   * @param {int} [options.retryInterval=1000] The number of ms to wait before retrying operations
   * @param {int} [options.parallelism=4] The number of concurrent chunks to upload
   * @param {Object} [options.fileAttributes] Attributes to set on the file during commit
   */
  constructor(
    client: BoxClient,
    uploadSessionInfo: UploadSessionInfo,
    file: ReadableStream | Buffer | string,
    size: number,
    options?: ChunkedUploaderOptions
  ) {
    super();

    this._client = client;
    this._sessionID = uploadSessionInfo.id;
    this._partSize = uploadSessionInfo.part_size;
    this._uploadSessionInfo = uploadSessionInfo;

    if (file instanceof ReadableStream) {
      // Pause the stream so we can read specific chunks from it
      this._stream = file.pause();
      this._streamBuffer = [];
    } else if (file instanceof Buffer || typeof file === 'string') {
      this._file = file;
    } else {
      throw new TypeError('file must be a Stream, Buffer, or string!');
    }

    this._size = size;
    this._options = Object.assign(
      {},
      DEFAULT_OPTIONS,
      options
    ) as Required<ChunkedUploaderOptions>;

    this._isStarted = false;
    this._numChunksInFlight = 0;
    this._chunks = [];
    this._position = 0;
    this._fileHash = crypto.createHash('sha1');
  }

  /**
   * Start an upload
   * @returns {Promise<Object>} A promise resolving to the uploaded file
   */
  start() {
    if (this._isStarted) {
      return this._promise;
    }

    // Create the initial chunks
    for (let i = 0; i < this._options.parallelism; i++) {
      this._getNextChunk((chunk: any /* FIXME */) =>
        chunk ? this._uploadChunk(chunk) : this._commit()
      );
    }
    this._isStarted = true;

    /* eslint-disable promise/avoid-new */
    this._promise = new Promise((resolve, reject) => {
      this._resolve = resolve;
      this._reject = reject;
    });
    /* eslint-enable promise/avoid-new */

    return this._promise;
  }

  /**
   * Abort a running upload, which cancels all currently uploading chunks,
   * attempts to free up held memory, and aborts the upload session.  This
   * cannot be undone or resumed.
   * @returns {Promise} A promise resolving when the upload is aborted
   * @emits ChunkedUploader#aborted
   * @emits ChunkedUploader#abortFailed
   */
  abort() {
    this._chunks.forEach((chunk) => chunk.removeAllListeners().cancel());
    this._chunks = [];
    this._file = null;
    this._stream = null;

    return (
      this._client.files
        .abortUploadSession(this._sessionID)
        /* eslint-disable promise/always-return */
        .then(() => {
          this.emit('aborted');
        })
        /* eslint-enable promise/always-return */
        .catch((err: any /* FIXME */) => {
          this.emit('abortFailed', err);
          throw err;
        })
    );
  }

  /**
   * Get the next chunk of the file to be uploaded
   * @param {Function} callback Called with the next chunk of the file to be uploaded
   * @returns {void}
   * @private
   */
  _getNextChunk(callback: Function) {
    if (this._position >= this._size) {
      callback(null);
      return;
    }

    let buf;

    if (this._file) {
      // Buffer/string case, just get the slice we need
      buf = this._file.slice(this._position, this._position + this._partSize);
    } else if (this._streamBuffer.length > 0) {
      buf = this._streamBuffer.shift();
    } else {
      // Stream case, need to read
      buf = (this._stream as ReadableStream).read(this._partSize);

      if (!buf) {
        // stream needs to read more, retry later
        setImmediate(() => this._getNextChunk(callback));
        return;
      } else if (buf.length > this._partSize) {
        // stream is done reading and had extra data, buffer the remainder of the file
        for (let i = 0; i < buf.length; i += this._partSize) {
          this._streamBuffer.push(buf.slice(i, i + this._partSize));
        }
        buf = this._streamBuffer.shift();
      }
    }

    this._fileHash.update(buf);
    let chunk = new Chunk(
      this._client,
      this._sessionID,
      buf,
      this._position,
      this._size,
      this._options
    );
    this._position += buf.length;
    callback(chunk);
  }

  /**
   * Upload a chunk
   * @param {Chunk} chunk The chunk to upload
   * @returns {void}
   * @emits ChunkedUploader#chunkError
   * @emits ChunkedUploader#chunkUploaded
   */
  _uploadChunk(chunk: any /* FIXME */) {
    this._numChunksInFlight += 1;

    chunk.on('error', (err: any /* FIXME */) => this.emit('chunkError', err));
    chunk.on('uploaded', (data: any /* FIXME */) => {
      this._numChunksInFlight -= 1;

      this.emit('chunkUploaded', data);
      this._getNextChunk((nextChunk: any /* FIXME */) =>
        nextChunk ? this._uploadChunk(nextChunk) : this._commit()
      );
    });
    chunk.upload();
    this._chunks.push(chunk);
  }

  /**
   * Commit the upload, finalizing it
   * @returns {void}
   * @emits ChunkedUploader#uploadComplete
   * @emits ChunkedUploader#error
   */
  _commit() {
    if (!this._isStarted || this._numChunksInFlight > 0) {
      return;
    }

    let hash = this._fileHash.digest('base64');
    this._isStarted = false;
    let options = Object.assign(
      {
        parts: this._chunks.map((c) => c.getData()),
      },
      this._options.fileAttributes
    );
    this._client.files.commitUploadSession(
      this._sessionID,
      hash,
      options,
      (err: any /* FIMXE */, file: any /* FIMXE */) => {
        // It's not clear what the SDK can do here, so we just return the error and session info
        // so users can retry if they wish
        if (err) {
          this.emit('error', {
            uploadSession: this._uploadSessionInfo,
            error: err,
          });
          this._reject!(err);
          return;
        }

        this.emit('uploadComplete', file);
        this._resolve!(file);
      }
    );
  }
}

export = ChunkedUploader;