
Stop Writing Async Boilerplate: How @angular-kit/rx-stateful Transforms RxJS Development
Introduction
If you’ve ever built a modern web application, you know that handling asynchronous operations is one of the most challenging aspects of frontend development. Loading states, error handling, data refreshing, race conditions – these are all common pain points that developers face daily. While RxJS provides powerful primitives for reactive programming, managing the complete lifecycle of async operations often requires writing substantial boilerplate code.
Enter @angular-kit/rx-stateful – a lightweight yet powerful RxJS library that transforms how we handle async state in reactive applications. This library doesn’t just simplify your code; it fundamentally improves the user experience by providing sophisticated features like non-flickering loading states, automatic error handling, and intelligent refresh mechanisms.
The Problem: Async State Management is Hard
Let’s start with a typical scenario. You need to fetch data from an API and display it in your Angular component. You want to handle loading states, errors, and allow users to refresh the data.
Here’s what you typically write with plain RxJS:
// Plain RxJS approach - verbose and error-prone
export class ProductListComponent {
products$ = new BehaviorSubject<Product[] | null>(null);
loading$ = new BehaviorSubject<boolean>(false);
error$ = new BehaviorSubject<Error | null>(null);
private destroy$ = new Subject<void>();
loadProducts() {
this.loading$.next(true);
this.error$.next(null);
this.http.get<Product[]>('/api/products')
.pipe(
takeUntil(this.destroy$),
finalize(() => this.loading$.next(false))
)
.subscribe({
next: (products) => {
this.products$.next(products);
},
error: (error) => {
this.error$.next(error);
this.products$.next(null);
}
});
}
refresh() {
// Should we keep the current data while refreshing?
// Should we show a loading state?
// What about error handling?
this.loadProducts(); // This clears everything!
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}
This approach has several problems:
- Boilerplate: Managing separate subjects for data, loading, and error states
- State inconsistency: Easy to forget updating one of the states
- Poor UX: Flickering loading states for fast requests
- No refresh strategy: Losing data when refreshing
- Race conditions: No built-in handling for concurrent requests
The Solution: rxRequest from @angular-kit/rx-stateful
With @angular-kit/rx-stateful, the same functionality becomes remarkably simple:
// Using rxRequest - clean and powerful
export class ProductListComponent {
productsRequest = rxRequest({
requestFn: () => this.http.get<Product[]>('/api/products'),
});
products$ = this.productsRequest.value$();
refresh() {
this.productsRequest.refresh();
}
}
In the template:
@if(products$ | async as state){
<div>
@if(state.isSuspense){
<mat-spinner></mat-spinner>
}
@if(state.hasError) {
<error-message [error]="state.error"></error-message>
}
@if(state.hasValue) {
<product-list [products]="state.value"></product-list>
}
</div>
}
Here’s what you gain:
- Minimal Boilerplate: One
rxRequest
call encapsulates all state management - Consistent State: Single source of truth for data, loading, and error states
- Smart Refresh: Convenient
refresh()
method - Race Condition Handling: Built-in request cancellation and proper sequencing
Core Features That Make the Difference
In this section we will explore all core features of the library.
1. 🔄 Unified State Stream
Instead of managing multiple observables, rx-stateful provides a single stream that encapsulates all state:
interface RxStateful<T, E> {
value: T | null; // The actual data
hasValue: boolean; // Convenience flag for data presence
context: 'suspense' | 'next' | 'error'; // Current state context
isSuspense: boolean; // Loading state
hasError: boolean; // Error presence flag
error: E | undefined; // Error details
}
This unified approach ensures that your state is always consistent and predictable.
2. ⚡ Non-Flickering Loading States
A common UX issue is flickering loading indicators for fast requests. rxRequest
solves this elegantly:
rxRequest({
requestFn: () => api.getData(),
config: {
suspenseThresholdMs: 500, // Wait 500ms before showing loader
suspenseTimeMs: 500 // Show loader for at least 500ms
}
})
In the viedo above you can see on the left side a flickering loading spinner for a fast requests. The loading spinner is blinking quickly.
In the middle you can see that no loading spinner is shown: In this example the request does not exceed the threshold and therefor no loading spinner is shown at all.
On the right side you can see that the loading spinner is shown for at least 500ms (suspenseTimeMs), even if the request is completed before that, preventing a flicker effect. In this case the request does take longer than the defined threshold.
This configuration ensures that:
- Fast requests (< 500ms) don’t show a loader at all
- When a loader is shown, it stays visible for at least 500ms (no flicker!)
3. 🔄 Smart Refresh Strategies
The library offers multiple refresh strategies that can be combined:
// Auto-refresh every 5 seconds for 1 minute
withAutoRefetch(5000, 60000)
// Auto-refresh until a signal
const stopPolling$ = new Subject<void>();
withAutoRefetch(5000, stopPolling$)
// Manual refresh with a trigger
withRefetchOnTrigger(refreshButton$)
// Combine multiple strategies
refetchStrategies: [
withAutoRefetch(10000, Infinity),
withRefetchOnTrigger(userRefresh$)
]
4. 🔄 Controlling Refresh Behavior
Clear value on refresh (default)
By default, the value is cleared when refreshing, showing a clean loading state.
Keep value on refresh
With keepValueOnRefresh: true
, the old value remains visible while fetching new data:
rxRequest({
requestFn: () => api.getData(),
config: {
keepValueOnRefresh: true // Old data stays visible
}
})
5. 🔴 Automatic Multicasting
All streams are automatically multicasted, preventing duplicate HTTP requests:
// This makes only ONE HTTP call!
const request = rxRequest({
requestFn: () => api.getData()
});
// Multiple subscriptions, single HTTP request
const value1$ = request.value$();
const value2$ = request.value$();
Unlike default Observable behavior, only one HTTP request is made even with multiple subscribers.
6. ⚙️ Powerful Configuration
Configure behavior globally or per-instance:
// Global configuration (in app.config.ts or AppModule)
provideRxStatefulConfig({
keepValueOnRefresh: true,
suspenseThresholdMs: 300,
suspenseTimeMs: 300
})
// Instance override
rxRequest({
requestFn: () => api.getData(),
config: {
keepValueOnRefresh: false, // Override global config
keepErrorOnRefresh: true,
errorMappingFn: (error) => error.message
}
})
7. 🎯 Error Handling and Recovery
Errors are automatically captured and exposed in the state stream:
rxRequest({
requestFn: () => api.getData(),
config: {
keepErrorOnRefresh: true, // Keep error visible on refresh
errorMappingFn: (error) => error.message, // Transform error for display
beforeHandleErrorFn: (error) => console.error(error) // Log errors
}
})
Advanced Use Cases
Use Case 1: Parameterized Requests with Triggers
// Search with debounce and loading states
searchTrigger$ = new Subject<string>();
searchResults = rxRequest({
trigger: this.searchTrigger$.pipe(
debounceTime(300),
distinctUntilChanged()
),
requestFn: (query: string) => this.api.search(query),
config: {
keepValueOnRefresh: false, // Clear old results
suspenseThresholdMs: 200, // Quick loader for search
operator: 'switch' // Cancel previous searches
}
});
// Trigger search
onSearch(query: string) {
this.searchTrigger$.next(query);
}
Use Case 2: Polling with Conditional Stop
// Poll until condition is met
const stopPolling$ = new Subject<void>();
orderStatus = rxRequest({
requestFn: () => this.api.getOrderStatus(orderId),
config: {
refetchStrategies: withAutoRefetch(2000, stopPolling$)
}
});
// Stop polling when order is complete
orderStatus.value$().pipe(
filter(state => state.value?.status === 'completed'),
take(1)
).subscribe(() => stopPolling$.next());
Use Case 3: Paginated Data Loading
// Pagination with state preservation
page$ = new BehaviorSubject<number>(1);
paginatedData = rxRequest({
trigger: this.page$,
requestFn: (page: number) => this.api.getProducts({ page, limit: 10 }),
config: {
keepValueOnRefresh: false, // Clear data between pages
}
});
// Navigate pages
nextPage() {
this.page$.next(this.page$.value + 1);
}
previousPage() {
this.page$.next(this.page$.value - 1);
}
Use Case 4: Dependent Requests
// Load user, then load their permissions
userId$ = new Subject<string>();
userRequest = rxRequest({
trigger: this.userId$,
requestFn: (id: string) => this.api.getUser(id)
});
permissionsRequest = rxRequest({
trigger: this.userRequest.value$().pipe(
filter(state => state.hasValue),
map(state => state.value!.id)
),
requestFn: (userId: string) => this.api.getUserPermissions(userId)
});
Comparison: Plain RxJS vs rx-stateful
Let’s look at a comprehensive example showing the dramatic difference:
Plain RxJS Implementation
class DataService {
private dataSubject$ = new BehaviorSubject<Data | null>(null);
private loadingSubject$ = new BehaviorSubject<boolean>(false);
private errorSubject$ = new BehaviorSubject<Error | null>(null);
private refreshInterval$: Subscription;
private destroy$ = new Subject<void>();
data$ = this.dataSubject$.asObservable();
loading$ = this.loadingSubject$.asObservable();
error$ = this.errorSubject$.asObservable();
private loadingTimer$: Subscription;
private minimumLoadingTime = 500;
private currentId: string;
loadData(id: string) {
this.currentId = id;
// Start loading
const loadingStartTime = Date.now();
this.loadingSubject$.next(true);
this.errorSubject$.next(null);
this.http.get<Data>(`/api/data/${id}`)
.pipe(
// Handle minimum loading time
switchMap(data => {
const elapsed = Date.now() - loadingStartTime;
const remaining = Math.max(0, this.minimumLoadingTime - elapsed);
return timer(remaining).pipe(map(() => data));
}),
takeUntil(this.destroy$)
)
.subscribe({
next: (data) => {
this.dataSubject$.next(data);
this.loadingSubject$.next(false);
},
error: (error) => {
this.errorSubject$.next(error);
this.loadingSubject$.next(false);
this.dataSubject$.next(null);
}
});
}
refresh(keepData = false) {
if (!keepData) {
this.dataSubject$.next(null);
}
const currentId = this.currentId;
if (currentId) {
this.loadData(currentId);
}
}
startAutoRefresh(interval: number) {
this.stopAutoRefresh();
this.refreshInterval$ = interval(interval)
.pipe(takeUntil(this.destroy$))
.subscribe(() => this.refresh(true));
}
stopAutoRefresh() {
this.refreshInterval$?.unsubscribe();
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
this.stopAutoRefresh();
}
}
rx-stateful Implementation
class DataService {
private trigger$ = new Subject<string>();
dataRequest = rxRequest({
trigger: this.trigger$,
requestFn: (id: string) => this.http.get<Data>(`/api/data/${id}`),
config: {
keepValueOnRefresh: true,
suspenseThresholdMs: 500,
suspenseTimeMs: 500,
refetchStrategies: withAutoRefetch(30000, Infinity)
}
});
data$ = this.dataRequest.value$();
loadData(id: string) {
this.trigger$.next(id);
}
refresh() {
this.dataRequest.refresh();
}
}
The difference is staggering:
- 80% less code to write and maintain
- Zero subscription management needed
- Built-in race condition handling
- Automatic error recovery
- Sophisticated UX features out of the box
Testing
The library provides testing utilities to make testing components using rx-stateful simple:
import { mockRxRequest } from '@angular-kit/rx-stateful/testing';
it('should handle data loading', () => {
const mockRequest = mockRxRequest<User[]>();
// Replace the real request with mock
component.userRequest = mockRequest.instance;
// Emit test data
mockRequest.state$Trigger.next({
value: [{ id: 1, name: 'Test' }],
hasValue: true,
context: 'next',
hasError: false,
error: undefined,
isSuspense: false
});
// Assert component behavior
component.data$.subscribe(state => {
expect(state.value).toEqual([{ id: 1, name: 'Test' }]);
});
// Test refresh functionality
component.refresh();
expect(mockRequest.refreshTrigger).toHaveBeenCalled();
});
Real-World Impact
After implementing rx-stateful in production applications, teams have reported:
- Reduction in async-related bugs
- Improved user experience with non-flickering loaders
- Faster development with less boilerplate
- Better maintainability with cleaner, more declarative code
- Consistent error handling across the application
Conclusion
@angular-kit/rx-stateful
is a perfect fit to reduce the complexity of async state management and eliminiate boilerplate code.
Getting Started
npm install @angular-kit/rx-stateful
import { rxRequest } from '@angular-kit/rx-stateful';
// Start using it immediately
const myRequest = rxRequest({
requestFn: () => fetch('/api/data').then(r => r.json()),
config: {
suspenseThresholdMs: 500,
suspenseTimeMs: 500
}
});
myRequest.value$().subscribe(state => {
console.log('Current state:', state);
});
Resources
Ready to revolutionize your async state management? Start using @angular-kit/rx-stateful today and experience the difference firsthand.