core/datastore/sync/sync-manager.js
import { Promise } from 'es6-promise';
import clone from 'lodash/clone';
import { Log } from '../../log';
import { KinveyError, NotFoundError, SyncError, InvalidCachedQuery } from '../../errors';
import { getPlatformConfig } from '../../platform-configs';
import { SyncOperation } from './sync-operation';
import { maxEntityLimit, defaultPullSortField } from './utils';
import { isEmpty } from '../utils';
import { repositoryProvider } from '../repositories';
import { Query } from '../../query';
import { ensureArray, isNonemptyString, forEachAsync, splitQueryIntoPages } from '../../utils';
import { deltaSet } from '../deltaset';
import { getCachedQuery, updateCachedQuery, deleteCachedQuery } from '../querycache';
const {
maxConcurrentPullRequests: maxConcurrentPulls,
maxConcurrentPushRequests: maxConcurrentPushes,
} = getPlatformConfig();
const pushTrackingByCollection = {};
/**
* @private
*/
export class SyncManager {
_offlineRepoPromise;
_networkRepo;
/** @type {SyncStateManager} */
_syncStateManager;
constructor(networkRepo, syncStateManager) {
this._networkRepo = networkRepo;
this._syncStateManager = syncStateManager;
}
push(collection, query) {
if (isEmpty(collection) || !isNonemptyString(collection)) {
return Promise.reject(new KinveyError('Invalid or missing collection name'));
}
if (this._pushIsInProgress(collection)) {
return Promise.reject(new SyncError('Data is already being pushed to the backend.'
+ ' Please wait for it to complete before pushing new data to the backend.'));
}
this._markPushStart(collection);
let prm = Promise.resolve();
if (query) {
prm = this._getEntityIdsForQuery(collection, query);
}
return prm
.then((entityIds) => this._syncStateManager.getSyncItems(collection, entityIds))
.then((syncItems = []) => this._processSyncItems(collection, syncItems))
.then((pushResult) => {
this._markPushEnd(collection);
return pushResult;
})
.catch((err) => {
this._markPushEnd();
return Promise.reject(err);
});
}
pull(collection, query, options = {}) {
return Promise.resolve()
.then(() => {
if (!isNonemptyString(collection)) {
throw new KinveyError('Invalid or missing collection name');
}
})
.then(() => {
if (options.useDeltaSet) {
return deltaSet(collection, query, options)
.then((response) => {
return getCachedQuery(collection, query)
.then((cachedQuery) => {
if (cachedQuery) {
cachedQuery.lastRequest = response.headers.requestStart;
return updateCachedQuery(cachedQuery);
}
return null;
})
.then(() => response.data);
})
.then((data) => {
if (data.deleted.length > 0) {
const deleteQuery = new Query();
deleteQuery.contains('_id', data.deleted.map((entity) => entity._id));
return this._deleteOfflineEntities(collection, deleteQuery)
.then(() => data);
}
return data;
})
.then((data) => {
if (data.changed.length > 0) {
return this._getOfflineRepo()
.then((offlineRepo) => offlineRepo.update(collection, data.changed))
.then(() => data.changed.length);
}
return 0;
});
} else if (options.autoPagination) {
return this._paginatedPull(collection, query, options);
}
return this._fetchItemsFromServer(collection, query, options)
.then((response) => {
return getCachedQuery(collection, query)
.then((cachedQuery) => {
if (cachedQuery && response.headers) {
cachedQuery.lastRequest = response.headers.requestStart;
return updateCachedQuery(cachedQuery);
}
return null;
})
.then(() => response.data ? response.data : response);
})
.then((data) => this._replaceOfflineEntities(collection, query, data).then((data) => data.length));
})
.catch((error) => {
if (error instanceof InvalidCachedQuery) {
return getCachedQuery(collection, query)
.then((cachedQuery) => deleteCachedQuery(cachedQuery))
.catch((error) => {
if (error instanceof NotFoundError) {
return null;
}
throw error;
})
.then(() => this.pull(collection, query, Object.assign(options, { useDeltaSet: false })));
}
throw error;
});
}
getSyncItemCount(collection) {
if (!isNonemptyString(collection)) {
return Promise.reject(new KinveyError('Invalid or missing collection name'));
}
return this._syncStateManager.getSyncItemCount(collection);
}
// TODO: this method is temporray, pending fix for MLIBZ-2177
getSyncItemCountByEntityQuery(collection, query) {
if (!query) {
return this._syncStateManager.getSyncItemCount(collection);
}
return this._getOfflineRepo()
.then(repo => repo.read(collection, query))
.then((entities) => {
const entityIds = entities.map(e => e._id);
return this._syncStateManager.getSyncItemCount(collection, entityIds);
});
}
// TODO: this only returns nondeleted entities - pending fix for MLIBZ-2177
getSyncEntities(collection, query) {
return this._getOfflineRepo()
.then(repo => repo.read(collection, query))
.then((entities = []) => {
return this._syncStateManager.getSyncItems(collection, entities.map(e => e._id));
});
}
// TODO: pending fix for MLIBZ-2177
clearSync(collection, query) {
if (query) {
return this._getEntityIdsForQuery(collection, query)
.then(entityIds => this._syncStateManager.removeSyncItemsForIds(collection, entityIds));
}
return this._syncStateManager.removeAllSyncItems(collection);
}
// for SyncStateManager
addCreateEvent(collection, createdItems) {
return this._addEvent(collection, createdItems, SyncOperation.Create);
}
addDeleteEvent(collection, deletedEntities) {
return this._addEvent(collection, deletedEntities, SyncOperation.Delete);
}
addUpdateEvent(collection, updatedEntities) {
return this._addEvent(collection, updatedEntities, SyncOperation.Update);
}
removeSyncItemForEntityId(collection, entityId) {
return this._syncStateManager.removeSyncItemForEntityId(collection, entityId);
}
removeSyncItemsForIds(collection, entityIds) {
return this._syncStateManager.removeSyncItemsForIds(collection, entityIds);
}
_deleteOfflineEntities(collection, query) {
return this._getOfflineRepo()
.then(repo => repo.delete(collection, query));
}
_replaceOfflineEntities(collection, deleteOfflineQuery, networkEntities = []) {
// TODO: this can potentially be deleteOfflineQuery.and().notIn(networkEntitiesIds)
// but inmemory filtering with this filter seems to take too long
if (deleteOfflineQuery && (deleteOfflineQuery.hasSkip() || deleteOfflineQuery.hasLimit())) {
return this._getOfflineRepo()
.then((repo) => repo.update(collection, networkEntities));
}
return this._deleteOfflineEntities(collection, deleteOfflineQuery)
.then(() => this._getOfflineRepo())
.then(repo => repo.update(collection, networkEntities));
}
_getPushOpResult(entityId, operation) {
const result = {
_id: entityId,
operation: operation
};
if (operation !== SyncOperation.Delete) {
result.entity = null;
}
return result;
}
_sanitizeOfflineEntity(offlineEntity) {
const copy = clone(offlineEntity);
delete copy._id;
return copy;
}
_replaceOfflineEntityWithNetwork(collection, offlineEntityId, networkEntity) {
let offlineRepo;
return this._getOfflineRepo()
.then((repo) => {
offlineRepo = repo;
return offlineRepo.deleteById(collection, offlineEntityId);
})
.then(() => offlineRepo.create(collection, networkEntity));
}
_pushCreate(collection, entity) {
let entityToCreate = entity;
if (entity._kmd && entity._kmd.local) {
entityToCreate = this._sanitizeOfflineEntity(entity);
}
const result = this._getPushOpResult(entity._id, SyncOperation.Create);
return this._networkRepo.create(collection, entityToCreate)
.then((createdItem) => {
result.entity = createdItem;
return this._replaceOfflineEntityWithNetwork(collection, entity._id, createdItem);
})
.then(() => result)
.catch((err) => {
result.error = err;
return result;
});
}
_pushDelete(collection, entityId) {
const result = this._getPushOpResult(entityId, SyncOperation.Delete);
return this._networkRepo.deleteById(collection, entityId)
.then(() => result)
.catch((err) => {
result.error = err;
return result;
});
}
_pushUpdate(collection, entity) {
const result = this._getPushOpResult(entity._id, SyncOperation.Update);
return this._networkRepo.update(collection, entity)
.then((updateResult) => {
result.entity = updateResult;
return this._getOfflineRepo();
})
.then(repo => repo.update(collection, result.entity))
.then(() => result)
.catch((err) => {
result.entity = entity;
result.error = err;
return result;
});
}
_handlePushOp(collection, syncItem, offlineEntity) {
const { state, entityId } = syncItem;
const syncOp = state.operation;
switch (syncOp) {
case SyncOperation.Create:
return this._pushCreate(collection, offlineEntity);
case SyncOperation.Delete:
return this._pushDelete(collection, entityId);
case SyncOperation.Update:
return this._pushUpdate(collection, offlineEntity);
default: {
const res = this._getPushOpResult(entityId, syncOp);
res.error = new SyncError(`Unexpected sync operation: ${syncOp}`);
return res;
}
}
}
_pushItem(collection, syncItem) {
const { entityId, state } = syncItem;
return this._getOfflineRepo()
.then(repo => repo.readById(collection, entityId)) // TODO: we've already read the entities once, if a query was provided
.catch((err) => {
if (!(err instanceof NotFoundError)) {
return Promise.reject(err);
}
if (state.operation !== SyncOperation.Delete) {
return this._syncStateManager.removeSyncItemForEntityId(collection, entityId)
.then(() => Promise.reject(err));
}
return null; // we have to make a delete request to the backend
})
.then(offlineEntity => this._handlePushOp(collection, syncItem, offlineEntity));
}
_processSyncItem(collection, syncItem) {
return this._pushItem(collection, syncItem)
.then((result) => {
if (result.error) {
return result;
}
return this._syncStateManager.removeSyncItemForEntityId(syncItem.collection, syncItem.entityId)
.then(() => result);
})
.catch((err) => {
const pushResult = this._getPushOpResult(syncItem.entityId, syncItem.state.operation);
pushResult.error = err;
return pushResult;
});
}
_processSyncItems(collection, syncItems) {
const pushResults = [];
return forEachAsync(syncItems, (syncItem) => {
return this._processSyncItem(collection, syncItem) // never rejects
.then(pushResult => pushResults.push(pushResult));
}, maxConcurrentPushes)
.then(() => pushResults);
}
_fetchItemsFromServer(collection, query, options) {
return this._networkRepo.read(collection, query, Object.assign(options, { dataOnly: false }));
}
_getOfflineRepo() {
if (!this._offlineRepoPromise) {
this._offlineRepoPromise = repositoryProvider.getOfflineRepository();
}
return this._offlineRepoPromise;
}
_pushIsInProgress(collection) {
return !!pushTrackingByCollection[collection];
}
_markPushStart(collection) {
if (!this._pushIsInProgress(collection)) {
pushTrackingByCollection[collection] = true;
} else {
Log.debug('Marking push start, when push already started');
}
}
_markPushEnd(collection) {
if (this._pushIsInProgress(collection)) {
delete pushTrackingByCollection[collection];
} else {
Log.debug('Marking push end, when push is NOT started');
}
}
_getEntityIdsForQuery(collection, query) {
return this._getOfflineRepo()
.then(repo => repo.read(collection, query))
.then(entities => entities.map(e => e._id));
}
_addEvent(collection, entities, syncOp) {
const validationError = this._validateCrudEventEntities(entities);
if (validationError) {
return validationError;
}
return this._setState(collection, entities, syncOp)
.then(() => entities);
}
_validateCrudEventEntities(entities) {
if (!entities || isEmpty(entities)) {
return Promise.reject(new SyncError('Invalid or missing entity/entities array.'));
}
const entityWithNoId = ensureArray(entities).find(e => !e._id);
if (entityWithNoId) {
const errMsg = 'An entity is missing an _id. All entities must have an _id in order to be added to the sync table.';
return Promise.reject(new SyncError(errMsg, entityWithNoId));
}
return null;
}
_setState(collection, entities, syncOp) {
switch (syncOp) {
case SyncOperation.Create:
return this._syncStateManager.addCreateEvent(collection, entities);
case SyncOperation.Update:
return this._syncStateManager.addUpdateEvent(collection, entities);
case SyncOperation.Delete:
return this._syncStateManager.addDeleteEvent(collection, entities);
default:
return Promise.reject(new SyncError('Invalid sync event name'));
}
}
_getInternalPullQuery(userQuery, totalCount) {
userQuery = userQuery || {};
const { filter, sort, fields } = userQuery;
const query = new Query({ filter, sort, fields });
query.limit = totalCount;
if (!sort || isEmpty(sort)) {
query.sort = { [defaultPullSortField]: 1 };
}
return query;
}
_fetchAndUpdateEntities(collection, query, options) {
return this._networkRepo.read(collection, query, options)
.then((entities) => {
return this._getOfflineRepo()
.then(repo => repo.update(collection, entities));
});
}
_executePaginationQueries(collection, queries, options) {
let pulledEntityCount = 0;
return forEachAsync(queries, (query) => {
return this._fetchAndUpdateEntities(collection, query, options)
.then((updatedEntities) => {
pulledEntityCount += updatedEntities.length;
});
}, maxConcurrentPulls)
.then(() => pulledEntityCount);
}
_getExpectedEntityCount(collection, userQuery) {
const countQuery = new Query({ filter: userQuery.filter });
return this._networkRepo.count(collection, countQuery, { dataOnly: false })
.then((response) => {
return {
lastRequest: response.headers ? response.headers.requestStart : undefined,
count: response.data ? response.data.count : response
};
});
}
_paginatedPull(collection, userQuery, options = {}) {
let pullQuery;
userQuery = userQuery || new Query();
return this._getExpectedEntityCount(collection, userQuery)
.then(({ lastRequest, count }) => {
pullQuery = this._getInternalPullQuery(userQuery, count);
return this._deleteOfflineEntities(collection)
.then(() => {
const pageSizeSetting = options.autoPagination && options.autoPagination.pageSize;
const pageSize = pageSizeSetting || maxEntityLimit;
const paginatedQueries = splitQueryIntoPages(pullQuery, pageSize, count);
return this._executePaginationQueries(collection, paginatedQueries, options);
})
.then((result) => {
return getCachedQuery(collection, userQuery)
.then((cachedQuery) => {
if (cachedQuery) {
cachedQuery.lastRequest = lastRequest;
return updateCachedQuery(cachedQuery);
}
return null;
})
.then(() => result);
});
});
}
}