Home Reference Source

core/live/user-to-user/stream.js

import { Promise } from 'es6-promise';

import { KinveyError } from '../../errors';
import { Client } from '../../client';
import { isNonemptyString } from '../../utils';
import { getLiveService } from '../live-service';
import { KinveyRequest, RequestMethod, StatusCode } from '../../request';
import { StreamACL } from './stream-acl';


/**
 * @private
 * A Stream, created in the backend
 * @class Stream
 */
export class Stream {
  constructor(name) {
    this.name = name;
    this._client = Client.sharedInstance();
    this._liveService = getLiveService(this._client);
    this._subscribeChannels = {};
    this._publishChannels = {};
  }

  static get StreamACL() {
    return StreamACL;
  }

  /**
   * Gets all substreams of this Stream
   * @returns {Promise}
   */
  getSubstreams() {
    return this._makeStreamRequest('_substreams', RequestMethod.GET);
  }

  /**
   * Gets the current ACL for the given user's message stream
   * @param {string} userId The ID of the user whose stream ACL is being retrieved
   * @returns {Promise} Promise for the ACL object returned
   */
  getACL(userId) {
    if (!isNonemptyString(userId)) {
      return Promise.reject(new KinveyError('Invalid or missing id'));
    }

    return this._makeStreamRequest(userId, RequestMethod.GET);
  }

  /**
   * Sets (overwrites) the ACL for the given user's message stream
   * @param {string} userId The ID of the user whose stream ACL is being set
   * @param {StreamACL} acl The ACL object to be set
   * @returns {Promise}
   */
  setACL(userId, acl) {
    if (!isNonemptyString(userId)) {
      return Promise.reject(new KinveyError('Invalid or missing id'));
    }

    if (!StreamACL.isValidACLObject(acl)) {
      return Promise.reject(new KinveyError('Invalid or missing ACL object'));
    }

    if (!(acl instanceof StreamACL)) {
      acl = new StreamACL(acl);
    }

    return this._makeStreamRequest(userId, RequestMethod.PUT, acl.toPlainObject());
  }

  // Feed comm

  /**
   * Subscribes the active user to the specified user's channel
   * @param {string} userId
   * @param {MessageReceiver} receiver
   * @returns {Promise}
   */
  follow(userId, receiver) {
    return this._subscribe(userId, receiver);
  }

  /**
   * Unsubscribes the active user from the specified user's channel
   * @param {string} userId
   * @returns {Promise}
   */
  unfollow(userId) {
    return this._unsubscribe(userId);
  }

  /**
   * Posts a message to the active user's channel
   * @param {Object} message
   * @returns {Promise}
   */
  post(message) {
    const userId = this._client.getActiveUser()._id;
    return this._publish(userId, message);
  }

  // Directed comm

  /**
   * Listens for messages sent to the active user
   * @param {MessageReceiver} receiver
   * @returns {Promise<void>}
   */
  listen(receiver) {
    const userId = this._client.getActiveUser()._id;
    return this._subscribe(userId, receiver);
  }

  /**
   * Stops listening for messages sent to the active user
   * @returns {Promise<void>}
   */
  stopListening() {
    const userId = this._client.getActiveUser()._id;
    return this._unsubscribe(userId);
  }

  /**
   * Sends a message to the specified user
   * @param {string} userId
   * @param {Object} message
   */
  send(userId, message) {
    return this._publish(userId, message);
  }

  /**
   * @private
   * @param {string} substreamOwnerId
   * @returns {Promise}
   */
  _unsubscribeFromSubstream(substreamOwnerId) {
    const path = `${substreamOwnerId}/unsubscribe`;

    return this._makeStreamRequest(path, RequestMethod.POST, { deviceId: this._client.deviceId });
  }

  /**
   * @private
   * @param {string} userId
   * @param {MessageReceiver} receiver
   * @returns {Promise}
   */
  _subscribe(userId, receiver) {
    return this._requestSubscribeAccess(userId)
      .then(() => {
        const channelName = this._subscribeChannels[userId];
        this._liveService.subscribeToChannel(channelName, receiver);
      });
  }

  /**
   * @private
   * @param {string} userId
   * @returns {Promise<void>}
   */
  _unsubscribe(userId) {
    return this._unsubscribeFromSubstream(userId)
      .then(() => {
        const channelName = this._subscribeChannels[userId];
        this._liveService.unsubscribeFromChannel(channelName);
        delete this._subscribeChannels[userId];
      });
  }

  /**
   * @private
   * @param {string} userId
   * @param {Object} message
   * @returns {Promise}
   */
  _publish(userId, message) {
    if (this._publishChannels[userId]) {
      return this._retriablePublish(userId, message);
    }
    return this._publishWithAccessRequest(userId, message);
  }

  /**
   * @private
   * @param {string} userId
   * @param {Object} message
   * @returns {Promise}
   */
  _publishWithAccessRequest(userId, message) {
    return this._requestPublishAccess(userId)
      .then(() => {
        const channelName = this._publishChannels[userId];
        return this._liveService.publishToChannel(channelName, message);
      });
  }

  /**
   * @private
   * @param {string} userId
   * @param {Object} message
   * @returns {Promise}
   */
  _retriablePublish(userId, message) {
    const channelName = this._publishChannels[userId];
    return this._liveService.publishToChannel(channelName, message)
      .catch((err) => {
        let promise = Promise.reject(err);
        if (err.statusCode === StatusCode.Forbidden) {
          promise = this._publishWithAccessRequest(userId, message);
        }
        return promise;
      });
  }

  /**
   * @private
   * @param {string} userId
   * @returns {Promise}
   */
  _requestPublishAccess(userId) {
    return this._makeStreamRequest(`${userId}/publish`, RequestMethod.POST)
      .then((resp) => {
        if (resp.substreamChannelName) {
          this._publishChannels[userId] = resp.substreamChannelName;
        }
        return resp;
      });
  }

  /**
   * @private
   * @param {string} userId
   * @returns {Promise}
   */
  _requestSubscribeAccess(userId) {
    const requestBody = { deviceId: this._client.deviceId };

    return this._makeStreamRequest(`${userId}/subscribe`, RequestMethod.POST, requestBody)
      .then((response) => {
        this._subscribeChannels[userId] = response.substreamChannelName;
        return response;
      });
  }

  /**
   * @private
   * @param {string} path The path after the stream/kid part
   * @param {RequestMethod} method The request method to be used
   * @param {Object} [body] The body of the request, if applicable
   * @returns {Promise}
   */
  _makeStreamRequest(path, method, body) {
    return KinveyRequest.execute({
      method: method,
      pathname: `/stream/${this._client.appKey}/${this.name}/${path}`,
      body: body
    }, this._client);
  }
}