Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/reducers/reducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,13 @@ const onNewTx = (state, action) => {
*/
const onUpdateTokenHistory = (state, action) => {
const { token, newHistory } = action.payload;
const existingData = get(state.tokensHistory, `${token}.data`, []);

// Create a Set of existing txIds for efficient lookup
const existingTxIds = new Set(existingData.map((tx) => tx.txId));

// Filter out any transactions that already exist to prevent duplicates
const uniqueNewHistory = newHistory.filter((tx) => !existingTxIds.has(tx.txId));
Comment on lines +892 to +898
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's still one race condition that I couldn't find, so I decided to go with the deduplication code. With this code I could've removed the other part, this would already prevent showing duplicated txs but the rest of the code is also an improvement.


return {
...state,
Expand All @@ -897,8 +904,8 @@ const onUpdateTokenHistory = (state, action) => {
[token]: {
...state.tokensHistory[token],
data: [
...get(state.tokensHistory, `${token}.data`, []),
...newHistory,
...existingData,
...uniqueNewHistory,
]
}
},
Expand Down
91 changes: 89 additions & 2 deletions src/sagas/tokens.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ const BALANCE_FETCH_MAX_RETRIES = 3;
*/
const pendingBalanceRequests = new Map();

/**
* Map to track pending history fetch requests per tokenId.
* This enables request deduplication and force upgrade capability.
*
* Structure: Map<tokenId, { force: boolean }>
* - force: whether the pending request should force a fresh fetch
*/
const pendingHistoryRequests = new Map();

/**
* This saga will create a channel to queue TOKEN_FETCH_BALANCE_REQUESTED actions and
* consumers that will run in parallel consuming those actions.
Expand Down Expand Up @@ -208,8 +217,83 @@ function* fetchTokenBalance(action) {
}
}

/**
* This saga will create a channel to queue TOKEN_FETCH_HISTORY_REQUESTED actions and
* consumers that will run in parallel consuming those actions.
*
* 1. Request deduplication: Only one fetch per tokenId at a time
* 2. Force upgrade: If a force=true request arrives while a force=false is pending,
* the pending request is upgraded to force=true
* 3. Race condition prevention: Prevents multiple consumers from processing the same tokenId
*/
function* fetchTokenHistoryQueue() {
const fetchTokenHistoryChannel = yield call(channel);

// Fork CONCURRENT_FETCH_REQUESTS threads to download token history
for (let i = 0; i < CONCURRENT_FETCH_REQUESTS; i += 1) {
yield fork(fetchTokenHistoryConsumer, fetchTokenHistoryChannel);
}

while (true) {
const action = yield take(types.TOKEN_FETCH_HISTORY_REQUESTED);
const { tokenId, force } = action;

// Check if there's already a pending request for this tokenId
if (pendingHistoryRequests.has(tokenId)) {
const pending = pendingHistoryRequests.get(tokenId);

// Upgrade to force=true if the new request has force=true
if (force && !pending.force) {
log.debug(`Upgrading pending history request for ${tokenId} to force=true`);
pending.force = true;
}

// Skip queueing duplicate request - the existing one will handle it
log.debug(`Skipping duplicate history request for ${tokenId}, pending request exists`);
continue;
}

// Create new pending entry and queue the request
pendingHistoryRequests.set(tokenId, { force });
yield put(fetchTokenHistoryChannel, action);
}
}

/**
* This saga will consume the fetchTokenHistoryChannel for TOKEN_FETCH_HISTORY_REQUEST actions
* and wait until the TOKEN_FETCH_HISTORY_SUCCESS action is dispatched with the specific tokenId
*/
function* fetchTokenHistoryConsumer(fetchTokenHistoryChannel) {
while (true) {
const action = yield take(fetchTokenHistoryChannel);

yield fork(fetchTokenHistory, action);
// Wait until the success action is dispatched before consuming another action
yield take(
specificTypeAndPayload([
types.TOKEN_FETCH_HISTORY_SUCCESS,
types.TOKEN_FETCH_HISTORY_FAILED,
], {
tokenId: action.tokenId,
}),
);
}
}

/**
* Fetches the history for a specific token.
*
* 1. Checks pendingHistoryRequests for the latest force value (supports force upgrade)
* 2. Properly cleans up pending request tracking on completion
*
* @param {Object} action - The action containing tokenId and force flag
*/
function* fetchTokenHistory(action) {
const { tokenId, force } = action;
const { tokenId } = action;

// Get the current force value from pending requests (may have been upgraded)
const pendingRequest = pendingHistoryRequests.get(tokenId);
const force = pendingRequest?.force ?? action.force;

try {
const wallet = yield select((state) => state.wallet);
Expand All @@ -230,6 +314,9 @@ function* fetchTokenHistory(action) {
} catch (e) {
log.error('Error while fetching token history.', e);
yield put(tokenFetchHistoryFailed(tokenId));
} finally {
// Clean up pending request tracking
pendingHistoryRequests.delete(tokenId);
}
}

Expand Down Expand Up @@ -374,8 +461,8 @@ export function* fetchTokenData(tokenId, force = false) {
export function* saga() {
yield all([
fork(fetchTokenBalanceQueue),
fork(fetchTokenHistoryQueue),
fork(fetchTokenMetadataQueue),
takeEvery(types.TOKEN_FETCH_HISTORY_REQUESTED, fetchTokenHistory),
takeEvery(types.NEW_TOKEN, routeTokenChange),
takeEvery(types.SET_TOKENS, routeTokenChange),
]);
Expand Down
34 changes: 22 additions & 12 deletions src/screens/MainScreen.js
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,9 @@ class MainScreen extends React.Component {
class TxHistoryView extends React.Component {
state = { loading: false, canLoadMore: true };

// Synchronous flag to prevent race conditions with setState
isLoadingMore = false;

renderItem = ({ item, index }) => {
const isFirst = (index === 0);
const isLast = (index === (this.props.txList.length - 1));
Expand All @@ -263,23 +266,30 @@ class TxHistoryView extends React.Component {
};

loadMoreHistory = async () => {
if (!this.state.canLoadMore) {
// Already loaded all history
if (!this.state.canLoadMore || this.isLoadingMore) {
// Already loaded all history or currently loading
return;
}

// Set synchronous flag immediately to prevent race conditions
this.isLoadingMore = true;
this.setState({ loading: true });
const newHistory = await fetchMoreHistory(
this.props.wallet,
this.props.token.uid,
this.props.txList
);

if (newHistory.length) {
this.props.updateTokenHistory(this.props.token.uid, newHistory);
this.setState({ loading: false });
} else {
this.setState({ canLoadMore: false, loading: false });
try {
const newHistory = await fetchMoreHistory(
this.props.wallet,
this.props.token.uid,
this.props.txList
);

if (newHistory.length) {
this.props.updateTokenHistory(this.props.token.uid, newHistory);
this.setState({ loading: false });
} else {
this.setState({ canLoadMore: false, loading: false });
}
} finally {
this.isLoadingMore = false;
}
}

Expand Down