Source: index.js

import EventEmitter from 'events'

/**
 * ## default options
 */
let defaults = {
  // start unpaused ?
  active: true,
  // requests per `ratePer` ms
  rate: 40,
  // ms per `rate` requests
  ratePer: 40000,
  // max concurrent requests
  concurrent: 20
}

/**
 * ## Throttle
 * The throttle object.
 *
 * @class
 * @param {object} options - key value options
 */
class Throttle extends EventEmitter {
  constructor (options) {
    super()
    // instance properties
    this._options({
      _requestTimes: [0],
      _current: 0,
      _buffer: [],
      _serials: {},
      _timeout: false
    })
    this._options(defaults)
    this._options(options)
  }

  /**
   * ## _options
   * updates options on instance
   *
   * @method
   * @param {Object} options - key value object
   * @returns null
   */
  _options (options) {
    for (let property in options) {
      if (options.hasOwnProperty(property)) {
        this[property] = options[property]
      }
    }
  }

  /**
   * ## options
   * thin wrapper for _options
   *
   *  * calls `this.cycle()`
   *  * adds alternate syntax
   *
   * alternate syntax:
   * throttle.options('active', true)
   * throttle.options({active: true})
   *
   * @method
   * @param {Object} options - either key value object or keyname
   * @param {Mixed} [value] - value for key
   * @returns null
   */
  options (options, value) {
    if (
      (typeof options === 'string') &&
      (value)
    ) {
      options = { options: value }
    }
    this._options(options)
    this.cycle()
  }

  /**
   * ## next
   * checks whether instance has available capacity and calls throttle.send()
   *
   * @returns {Boolean}
   */
  next () {
    let throttle = this
    // make requestTimes `throttle.rate` long. Oldest request will be 0th index
    throttle._requestTimes =
      throttle._requestTimes.slice(throttle.rate * -1)

    if (
      // paused
      !(throttle.active) ||
      // at concurrency limit
      (throttle._current >= throttle.concurrent) ||
      // less than `ratePer`
      throttle._isRateBound() ||
      // something waiting in the throttle
      !(throttle._buffer.length)
    ) {
      return false
    }
    let idx = throttle._buffer.findIndex((request) => {
      return !request.serial || !throttle._serials[request.serial]
    })
    if (idx === -1) {
      throttle._isSerialBound = true
      return false
    }
    throttle.send(throttle._buffer.splice(idx, 1)[0])
    return true
  }

  /**
   * ## serial
   * updates throttle.\_serials and throttle.\_isRateBound
   *
   * serial subthrottles allow some requests to be serialised, whilst maintaining
   * their place in the queue. The _serials structure keeps track of what serial
   * queues are waiting for a response.
   *
   * ```
   * throttle._serials = {
   *   'example.com/end/point': true,
   *   'example.com/another': false
   * }
   * ```
   *
   * @param {Request} request superagent request
   * @param {Boolean} state new state for serial
   */
  serial (request, state) {
    let serials = this._serials
    let throttle = this
    if (request.serial === false) {
      return
    }
    if (state === undefined) {
      return serials[request.serial]
    }
    if (state === false) {
      throttle._isSerialBound = false
    }
    serials[request.serial] = state
  }

  /**
   * ## _isRateBound
   * returns true if throttle is bound by rate
   *
   * @returns {Boolean}
   */
  _isRateBound () {
    let throttle = this
    return (
      ((Date.now() - throttle._requestTimes[0]) < throttle.ratePer) &&
      (throttle._buffer.length > 0)
    )
  }

  /**
   * ## cycle
   * an iterator of sorts. Should be called when
   *
   *  - something added to throttle (check if it can be sent immediately)
   *  - `ratePer` ms have elapsed since nth last call where n is `rate` (may have
   *    available rate)
   *  - some request has ended (may have available concurrency)
   *
   * @param {Request} request the superagent request
   * @returns null
   */
  cycle (request) {
    let throttle = this
    if (request) {
      throttle._buffer.push(request)
    }
    clearTimeout(throttle._timeout)

    // fire requests
    // throttle.next will return false if there's no capacity or throttle is
    // drained
    while (throttle.next()) {}

    // if bound by rate, set timeout to reassess later.
    if (throttle._isRateBound()) {
      let timeout
      // defined rate
      timeout = throttle.ratePer
      // less ms elapsed since oldest request
      timeout -= (Date.now() - throttle._requestTimes[0])
      // plus 1 ms to ensure you don't fire a request exactly ratePer ms later
      timeout += 1
      throttle._timeout = setTimeout(function () {
        throttle.cycle()
      }, timeout)
    }
  }

  /**
   * ## send
   *
   * sends a queued request.
   *
   * @param {Request} request superagent request
   * @returns null
   */
  send (request) {
    let throttle = this
    throttle.serial(request, true)

    // declare callback within this enclosure, for access to throttle & request
    function cleanup (err, response) {
      throttle._current -= 1
      if (err && EventEmitter.listenerCount(throttle, 'error')) {
        throttle.emit('error', response)
      }
      throttle.emit('received', request)

      if (
        (!throttle._buffer.length) &&
        (!throttle._current)
      ) {
        throttle.emit('drained')
      }
      throttle.serial(request, false)
      throttle.cycle()
      // original `callback` was stored at `request._maskedCallback`
      request._maskedCallback(err, response)
    }

    // original `request.end` was stored at `request._maskedEnd`
    request._maskedEnd(cleanup)
    throttle._requestTimes.push(Date.now())
    throttle._current += 1
    this.emit('sent', request)
  }

  /**
   * ## plugin
   *
   * `superagent` `use` function should refer to this plugin method a la
   * `.use(throttle.plugin())`
   *
   * mask the original `.end` and store the callback passed in
   *
   * @method
   * @param {string} serial any string is ok, it's just a namespace
   * @returns null
   */
  plugin (serial) {
    let throttle = this
    // let patch = function(request) {
    return (request) => {
      request.throttle = throttle
      request.serial = serial || false
      // replace request.end
      request._maskedEnd = request.end
      request.end = function (callback) {
        // when superagent receives a redirect header it essentially resets
        // the original request and calls `end` again with the new target url
        // that means throttle hasn't cleaned up yet, and still considers the
        // request to be in flight.
        // Therefore, when called by a redirect, just pass through to maskedEnd
        if (request._redirects > 0) return request._maskedEnd(callback)

        request._maskedCallback = callback || function () {}
        // place this request in the queue
        request.throttle.cycle(request)
        return request
      }
      return request
    }
  }
}

module.exports = Throttle