Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@contentstack/datasync-manager",
"author": "Contentstack LLC <[email protected]>",
"version": "2.1.2",
"version": "2.1.3",
"description": "The primary module of Contentstack DataSync. Syncs Contentstack data with your server using Contentstack Sync API",
"main": "dist/index.js",
"dependencies": {
Expand Down
17 changes: 9 additions & 8 deletions src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { join } from 'path'
import { stringify } from 'querystring'
import { sanitizeUrl } from '@braintree/sanitize-url';
import { readFileSync } from './util/fs'
import { MESSAGES } from './util/messages'

const debug = Debug('api')
let MAX_RETRY_LIMIT
Expand Down Expand Up @@ -74,7 +75,7 @@ export const get = (req, RETRY = 1) => {
}

try {
debug(`${options.method.toUpperCase()}: ${options.path}`)
debug(MESSAGES.API.REQUEST(options.method, options.path))
let timeDelay
let body = ''
const httpRequest = request(options, (response) => {
Expand All @@ -83,12 +84,12 @@ export const get = (req, RETRY = 1) => {
.setEncoding('utf-8')
.on('data', (chunk) => body += chunk)
.on('end', () => {
debug(`status: ${response.statusCode}.`)
debug(MESSAGES.API.STATUS(response.statusCode))
if (response.statusCode >= 200 && response.statusCode <= 399) {
return resolve(JSON.parse(body))
} else if (response.statusCode === 429) {
timeDelay = Math.pow(Math.SQRT2, RETRY) * RETRY_DELAY_BASE
debug(`API rate limit exceeded. Retrying ${options.path} with ${timeDelay} ms delay`)
debug(MESSAGES.API.RATE_LIMIT(options.path, timeDelay))

return setTimeout(() => {
return get(req, RETRY)
Expand All @@ -98,7 +99,7 @@ export const get = (req, RETRY = 1) => {
} else if (response.statusCode >= 500) {
// retry, with delay
timeDelay = Math.pow(Math.SQRT2, RETRY) * RETRY_DELAY_BASE
debug(`Retrying ${options.path} with ${timeDelay} ms delay`)
debug(MESSAGES.API.RETRY(options.path, timeDelay))
RETRY++

return setTimeout(() => {
Expand All @@ -107,7 +108,7 @@ export const get = (req, RETRY = 1) => {
.catch(reject)
}, timeDelay)
} else {
debug(`Request failed\n${JSON.stringify(options)}`)
debug(MESSAGES.API.REQUEST_FAILED(options))

return reject(body)
}
Expand All @@ -116,19 +117,19 @@ export const get = (req, RETRY = 1) => {

// Set socket timeout to handle socket hang ups
httpRequest.setTimeout(options.timeout, () => {
debug(`Request timeout for ${options.path || 'unknown'}`)
debug(MESSAGES.API.REQUEST_TIMEOUT(options.path))
httpRequest.destroy()
reject(new Error('Request timeout'))
})

// Enhanced error handling for socket hang ups and connection resets
httpRequest.on('error', (error: any) => {
debug(`Request error for ${options.path || 'unknown'}: ${error?.message || 'Unknown error'} (${error?.code || 'NO_CODE'})`)
debug(MESSAGES.API.REQUEST_ERROR(options.path, error?.message, error?.code))

// Handle socket hang up and connection reset errors with retry
if ((error?.code === 'ECONNRESET' || error?.message?.includes('socket hang up')) && RETRY <= MAX_RETRY_LIMIT) {
timeDelay = Math.pow(Math.SQRT2, RETRY) * RETRY_DELAY_BASE
debug(`Socket hang up detected. Retrying ${options.path || 'unknown'} with ${timeDelay} ms delay (attempt ${RETRY}/${MAX_RETRY_LIMIT})`)
debug(MESSAGES.API.SOCKET_HANGUP_RETRY(options.path, timeDelay, RETRY, MAX_RETRY_LIMIT))
RETRY++

return setTimeout(() => {
Expand Down
57 changes: 29 additions & 28 deletions src/core/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { get, init as initAPI } from '../api'
import { existsSync, readFileSync } from '../util/fs'
import { filterItems, formatItems, groupItems, markCheckpoint } from '../util/index'
import { logger } from '../util/logger'
import { MESSAGES } from '../util/messages'
import { map } from '../util/promise.map'
import { netConnectivityIssues } from './inet'
import { Q as Queue } from './q'
Expand Down Expand Up @@ -76,15 +77,15 @@ export const init = (contentStore, assetStore) => {
config = getConfig()
Q = new Queue(contentStore, assetStore, config)
initAPI(config.contentstack)
debug('Sync core:start invoked')
debug(MESSAGES.SYNC_CORE.START)

return new Promise((resolve, reject) => {
try {
Contentstack = config.contentstack
const checkPointConfig: ICheckpoint = config.checkpoint
const paths = config.paths
const environment = Contentstack.environment || process.env.NODE_ENV || 'development'
debug(`Environment: ${environment}`)
debug(MESSAGES.SYNC_CORE.ENVIRONMENT(environment))
const request: any = {
qs: {
environment,
Expand Down Expand Up @@ -136,23 +137,23 @@ const loadCheckpoint = (checkPointConfig: ICheckpoint, paths: any): void => {

// Set sync token if checkpoint is found
if (checkpoint) {
debug("Found sync token in checkpoint file:", checkpoint);
debug(MESSAGES.SYNC_CORE.TOKEN_FOUND, checkpoint);
Contentstack.sync_token = checkpoint.token;
debug("Using sync token:", Contentstack.sync_token);
debug(MESSAGES.SYNC_CORE.TOKEN_USING, Contentstack.sync_token);
}
};


function readHiddenFile(filePath: string) {
try {
if (!fs.existsSync(filePath)) {
logger.error("File does not exist:", filePath);
logger.error(MESSAGES.SYNC_CORE.FILE_NOT_FOUND(filePath));
return;
}
const data = fs.readFileSync(filePath, "utf8");
return JSON.parse(data);
} catch (err) {
logger.error("Error reading file:", err);
logger.error(MESSAGES.SYNC_CORE.FILE_READ_ERROR(err));
return undefined;
}
}
Expand All @@ -174,15 +175,15 @@ export const pop = () => {
*/
export const poke = async () => {
try {
debug('Invoked poke');
logger.info('Received \'contentstack sync\' notification')
debug(MESSAGES.SYNC_CORE.POKE_INVOKED);
logger.info(MESSAGES.SYNC_CORE.POKE_NOTIFICATION)
if (!flag.lockdown) {
flag.WQ = true
return await check()
}
return null;
} catch (error) {
debug('Error [poke]', error);
debug(MESSAGES.SYNC_CORE.POKE_ERROR, error);
throw error;
}
}
Expand All @@ -193,24 +194,24 @@ export const poke = async () => {
*/
const check = async () => {
try {
debug(`Check called. SQ status is ${flag.SQ} and WQ status is ${flag.WQ}`)
debug(MESSAGES.SYNC_CORE.CHECK_CALLED(flag.SQ, flag.WQ))
if (!flag.SQ && flag.WQ) {
flag.WQ = false
flag.SQ = true
await sync();
debug(`Sync completed and SQ flag updated. Cooloff duration is ${config.syncManager.cooloff}`)
debug(MESSAGES.SYNC_CORE.CHECK_COMPLETE(config.syncManager.cooloff))
setTimeout(() => {
flag.SQ = false
emitter.emit('check')
}, config.syncManager.cooloff)
}
} catch (error) {
logger.error(error)
debug('Error [check]', error);
debug(MESSAGES.SYNC_CORE.CHECK_ERROR, error);
check().then(() => {
debug('passed [check] error');
debug(MESSAGES.SYNC_CORE.CHECK_RECOVERED);
}).catch((error) => {
debug('failed [check] error', error);
debug(MESSAGES.SYNC_CORE.CHECK_FAILED, error);
});
throw error;
}
Expand All @@ -221,9 +222,9 @@ const check = async () => {
*/
const sync = async () => {
try {
debug('started [sync]');
debug(MESSAGES.SYNC_CORE.SYNC_STARTED);
const tokenObject = await getToken();
debug('tokenObject [sync]', tokenObject);
debug(MESSAGES.SYNC_CORE.SYNC_TOKEN_OBJECT, tokenObject);
const token: IToken = (tokenObject as IToken)
const request: any = {
qs: {
Expand All @@ -234,7 +235,7 @@ const sync = async () => {
}
return await fire(request)
} catch (error) {
debug('Error [sync]', error);
debug(MESSAGES.SYNC_CORE.SYNC_ERROR, error);
throw error
}
}
Expand All @@ -243,15 +244,15 @@ const sync = async () => {
* @description Used to lockdown the 'sync' process in case of exceptions
*/
export const lock = () => {
debug('Contentstack sync locked..')
debug(MESSAGES.SYNC_CORE.SYNC_LOCKED)
flag.lockdown = true
}

/**
* @description Used to unlock the 'sync' process in case of errors/exceptions
*/
export const unlock = (refire?: boolean) => {
debug('Contentstack sync unlocked..', refire)
debug(MESSAGES.SYNC_CORE.SYNC_UNLOCKED, refire)
flag.lockdown = false
if (typeof refire === 'boolean' && refire) {
flag.WQ = true
Expand All @@ -269,7 +270,7 @@ export const unlock = (refire?: boolean) => {
* @param {Object} req - Contentstack sync API request object
*/
const fire = (req: IApiRequest) => {
debug(`Fire called with: ${JSON.stringify(req)}`)
debug(MESSAGES.SYNC_CORE.FIRE_CALLED(req))
flag.SQ = true

return new Promise((resolve, reject) => {
Expand All @@ -279,7 +280,7 @@ const fire = (req: IApiRequest) => {
delete req.qs.sync_token
delete req.path
const syncResponse: ISyncResponse = response
debug('Response [fire]', syncResponse.items.length);
debug(MESSAGES.SYNC_CORE.FIRE_COMPLETE(syncResponse.items.length));
if (syncResponse.items.length) {
return filterItems(syncResponse, config).then(() => {
if (syncResponse.items.length === 0) {
Expand Down Expand Up @@ -319,7 +320,7 @@ const fire = (req: IApiRequest) => {
return map(contentTypeUids, (uid) => {

return new Promise((mapResolve, mapReject) => {
debug(`API called with for content type: ${uid}`)
debug(MESSAGES.SYNC_CORE.API_CALL_CT(uid))
return get({
path: `${Contentstack.apis.content_types}${uid}`,
qs: {
Expand All @@ -344,7 +345,7 @@ const fire = (req: IApiRequest) => {

return mapReject(err)
}).catch((error) => {
debug('Error [map] fetching content type schema:', error)
debug(MESSAGES.SYNC_CORE.ERROR_MAP, error)
if (netConnectivityIssues(error)) {
flag.SQ = false
}
Expand All @@ -361,11 +362,11 @@ const fire = (req: IApiRequest) => {
flag.SQ = false
}
// Errorred while fetching content type schema
debug('Error [mapResolve]:', error)
debug(MESSAGES.SYNC_CORE.ERROR_MAP_RESOLVE, error)
return reject(error)
})
}).catch((processError) => {
debug('Error [filterItems]:', processError)
debug(MESSAGES.SYNC_CORE.ERROR_FILTER_ITEMS, processError)
return reject(processError)
})
}
Expand All @@ -374,7 +375,7 @@ const fire = (req: IApiRequest) => {
.then(resolve)
.catch(reject)
}).catch((error) => {
debug('Error [fire]', error);
debug(MESSAGES.SYNC_CORE.ERROR_FIRE, error);
if (netConnectivityIssues(error)) {
flag.SQ = false
}
Expand Down Expand Up @@ -406,7 +407,7 @@ const postProcess = (req, resp) => {
req.qs[name] = resp[name]

if (flag.lockdown) {
logger.log('Checkpoint: lockdown has been invoked')
logger.log(MESSAGES.SYNC_CORE.CHECKPOINT_LOCKDOWN)
flag.requestCache = {
params: req,
reject,
Expand All @@ -419,7 +420,7 @@ const postProcess = (req, resp) => {
return resolve('')
}

debug(`Re-Fire called with: ${JSON.stringify(req)}`)
debug(MESSAGES.SYNC_CORE.REFIRE_CALLED(req))
return fire(req)
.then(resolve)
.catch(reject);
Expand Down
17 changes: 9 additions & 8 deletions src/core/inet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import dnsSocket from 'dns-socket'
import { EventEmitter } from 'events'
import { getConfig } from '../index'
import { logger } from '../util/logger'
import { MESSAGES } from '../util/messages'
import { poke } from './index'

interface ISyncManager {
Expand Down Expand Up @@ -47,7 +48,7 @@ export const init = () => {
port = sm.inet.port
dns = sm.inet.dns
currentTimeout = sm.inet.retryTimeout
debug(`inet initiated - waiting ${currentTimeout} before checking connectivity.`)
debug(MESSAGES.INET.INITIATED(currentTimeout))
// start checking for net connectivity, 30 seconds after the app has started
setTimeout(checkNetConnectivity, currentTimeout)
}
Expand All @@ -57,14 +58,14 @@ export const checkNetConnectivity = () => {
retries: sm.inet.retries,
timeout: sm.inet.timeout,
})
debug('checking network connectivity')
debug(MESSAGES.INET.CHECKING)
socket.query(query, port, dns, (err) => {
if (err) {
debug(`errorred.. ${err}`)
debug(MESSAGES.INET.CHECK_FAILED(err))
disconnected = true

return socket.destroy(() => {
debug('socket destroyed')
debug(MESSAGES.INET.CLEANUP_ERROR)
emitter.emit('disconnected', currentTimeout += sm.inet.retryIncrement)
})
} else if (disconnected) {
Expand All @@ -73,7 +74,7 @@ export const checkNetConnectivity = () => {
disconnected = false

return socket.destroy(() => {
debug('socket destroyed')
debug(MESSAGES.INET.CLEANUP_SUCCESS)
emitter.emit('ok')
})
})
Expand All @@ -92,12 +93,12 @@ export const netConnectivityIssues = (error) => {

emitter.on('ok', () => {
currentTimeout = sm.inet.retryTimeout
debug(`pinging ${sm.inet.host} in ${sm.inet.timeout} ms`)
debug(MESSAGES.INET.PINGING(sm.inet.host, sm.inet.timeout))
setTimeout(checkNetConnectivity, sm.inet.timeout)
})

emitter.on('disconnected', (timeout) => {
logger.warn('Network disconnected')
debug(`pinging ${sm.inet.host} in ${timeout} ms`)
logger.warn(MESSAGES.INET.DISCONNECTED)
debug(MESSAGES.INET.PINGING(sm.inet.host, timeout))
setTimeout(checkNetConnectivity, timeout)
})
Loading
Loading