In most front-end applications, fetching data from the backend is a common task to provide users with up-to-date information. While straightforward API calls often suffice, certain scenarios require more advanced techniques to handle real-time or periodic updates effectively. One such scenario is polling repeatedly calling an API at regular intervals. In this blog, we’ll explore how to implement polling using RxJS.
RxJS Refresher
RxJS (Reactive Extensions for JavaScript) is a library for reactive programming that makes working with asynchronous data streams simple and efficient. It provides tools like Observables, Operators, and Subjects to handle complex asynchronous workflows.
Why Use RxJS for Polling?
RxJS is well-suited for polling because it:
- Allows cancellation of ongoing polling tasks.
- Offers error handling mechanisms.
- Provides precise control over timing and retries.
- Enables clean, declarative code.
Setting Up Polling With RxJS
Here’s how we can create a robust polling mechanism using RxJS operators.
Core Operators in Polling
timer
: Creates an observable that emits values at specified intervals.scan
: Accumulates values over time, useful for tracking attempts.switchMapTo
: Maps each timer emission to the polling observable, canceling previous requests if necessary.takeWhile
: Stops polling based on a condition.tap
: Performs side effects, such as logging or validation.
Implementing Polling With pollWhile
Below is a function that provides a reusable polling mechanism. It allows customization of the polling interval, maximum attempts, and active polling condition.
Code: pollWhile
codeimport { MonoTypeOperatorFunction, timer } from 'rxjs';
import { scan, switchMapTo, takeWhile, tap, last } from 'rxjs/operators';
function attemptsGuardFactory(maxAttempts: number) {
return (attemptsCount: number) => {
if (attemptsCount > maxAttempts) {
throw new Error('Exceeded maxAttempts');
}
};
}
export function pollWhile<T>(
pollInterval: number,
isPollingActive: (res: T) => boolean,
maxAttempts: number = Infinity,
emitOnlyLast: boolean = false
): MonoTypeOperatorFunction<T> {
return (source$) => {
const poll$ = timer(0, pollInterval).pipe(
scan((attempts) => attempts + 1, 0),
tap(attemptsGuardFactory(maxAttempts)),
switchMapTo(source$),
takeWhile(isPollingActive, true)
);
return emitOnlyLast ? poll$.pipe(last()) : poll$;
};
}
Key Features
pollInterval
: Specifies the time (in milliseconds) between API calls.isPollingActive
: A function to determine whether polling should continue.maxAttempts
: Limits the number of polling attempts.emitOnlyLast
: Iftrue
, emits only the final result; otherwise, emits every value.
Using the Polling Function
Here’s an example of how to use pollWhile
in your application.
Example: Polling an API Until a Condition Is Met
codeimport { of } from 'rxjs';
import { delay, map } from 'rxjs/operators';
import { pollWhile } from './pollWhile'; // Import the polling function
// Mock API call
function mockApiCall() {
const statuses = ['pending', 'pending', 'complete'];
let index = 0;
return of(null).pipe(
delay(1000), // Simulate network delay
map(() => statuses[index++])
);
}
const isPollingActive = (status: string) => status !== 'complete';
mockApiCall().pipe(
pollWhile<string>(
2000, // Polling interval: 2 seconds
isPollingActive, // Continue polling until status is 'complete'
5, // Maximum attempts
true // Emit only the final result
)
).subscribe({
next: (result) => console.log('Final result:', result),
error: (err) => console.error('Error:', err),
complete: () => console.log('Polling complete.')
});
Advanced Polling: Exponential Backoff
In some cases, such as when a server is overloaded, increasing the delay between requests (exponential backoff) is a better approach to reduce server load.
Code: Exponential Backoff With pollWhile
codeimport { expand, timer } from 'rxjs';
import { scan, switchMapTo, takeWhile, tap, last } from 'rxjs/operators';
export function pollWhile<T>(
pollInterval: number,
growthFactor: number,
isPollingActive: (res: T) => boolean,
maxAttempts: number = Infinity,
emitOnlyLast: boolean = false
): MonoTypeOperatorFunction<T> {
return (source$) => {
const poll$ = timer(0).pipe(
scan((attempts) => attempts + 1, 0),
tap(attemptsGuardFactory(maxAttempts)),
expand((attempts) =>
timer(pollInterval * Math.pow(growthFactor, attempts))
),
switchMapTo(source$),
takeWhile(isPollingActive, true)
);
return emitOnlyLast ? poll$.pipe(last()) : poll$;
};
}
Usage
codemockApiCall().pipe(
pollWhile<string>(
2000, // Initial interval: 2 seconds
1.5, // Growth factor: 1.5
isPollingActive,
5,
true
)
).subscribe({
next: (result) => console.log('Final result:', result),
error: (err) => console.error('Error:', err),
complete: () => console.log('Polling complete.')
});
Benefits of Polling With RxJS
- Control: Set conditions for stopping polling dynamically.
- Efficiency: Avoid overlapping requests with operators like
switchMapTo
. - Flexibility: Customize intervals, maximum attempts, and emit behavior.
- Error Handling: Easily integrate error-handling logic using RxJS operators.
Conclusion
Polling with RxJS provides a robust and flexible way to handle repetitive API calls in modern applications. By leveraging RxJS’s powerful operators, you can build efficient, customizable polling mechanisms that handle real-world challenges like server overload and dynamic stopping conditions. Additionally, features like exponential backoff ensure your application remains performant even under high server loads.
Try implementing polling with RxJS in your next project, and you’ll see how reactive programming can simplify complex asynchronous tasks!