Stop Writing Async Boilerplate: How @angular-kit/rx-stateful Transforms RxJS Development

19 min read
#angular #rxjs #ux #frontend #state management

Introduction

If you’ve ever built a modern web application, you know that handling asynchronous operations is one of the most challenging aspects of frontend development. Loading states, error handling, data refreshing, race conditions – these are all common pain points that developers face daily. While RxJS provides powerful primitives for reactive programming, managing the complete lifecycle of async operations often requires writing substantial boilerplate code.

Enter @angular-kit/rx-stateful – a lightweight yet powerful RxJS library that transforms how we handle async state in reactive applications. This library doesn’t just simplify your code; it fundamentally improves the user experience by providing sophisticated features like non-flickering loading states, automatic error handling, and intelligent refresh mechanisms.

The Problem: Async State Management is Hard

Let’s start with a typical scenario. You need to fetch data from an API and display it in your Angular component. You want to handle loading states, errors, and allow users to refresh the data.

Here’s what you typically write with plain RxJS:

// Plain RxJS approach - verbose and error-prone
export class ProductListComponent {
  products$ = new BehaviorSubject<Product[] | null>(null);
  loading$ = new BehaviorSubject<boolean>(false);
  error$ = new BehaviorSubject<Error | null>(null);
  
  private destroy$ = new Subject<void>();
  
  loadProducts() {
    this.loading$.next(true);
    this.error$.next(null);
    
    this.http.get<Product[]>('/api/products')
      .pipe(
        takeUntil(this.destroy$),
        finalize(() => this.loading$.next(false))
      )
      .subscribe({
        next: (products) => {
          this.products$.next(products);
        },
        error: (error) => {
          this.error$.next(error);
          this.products$.next(null);
        }
      });
  }
  
  refresh() {
    // Should we keep the current data while refreshing?
    // Should we show a loading state?
    // What about error handling?
    this.loadProducts(); // This clears everything!
  }
  
  ngOnDestroy() {
    this.destroy$.next();
    this.destroy$.complete();
  }
}

This approach has several problems:

  • Boilerplate: Managing separate subjects for data, loading, and error states
  • State inconsistency: Easy to forget updating one of the states
  • Poor UX: Flickering loading states for fast requests
  • No refresh strategy: Losing data when refreshing
  • Race conditions: No built-in handling for concurrent requests

The Solution: rxRequest from @angular-kit/rx-stateful

With @angular-kit/rx-stateful, the same functionality becomes remarkably simple:

// Using rxRequest - clean and powerful
export class ProductListComponent {
  productsRequest = rxRequest({
    requestFn: () => this.http.get<Product[]>('/api/products'),
  });
  
  products$ = this.productsRequest.value$();
  
  refresh() {
    this.productsRequest.refresh();
  }
}

In the template:

@if(products$ | async as state){
  <div>
    @if(state.isSuspense){
      <mat-spinner></mat-spinner>
    }
    @if(state.hasError) {
        <error-message [error]="state.error"></error-message>
    }

    @if(state.hasValue) {
        <product-list [products]="state.value"></product-list>
    }

  </div>
}

Here’s what you gain:

  • Minimal Boilerplate: One rxRequest call encapsulates all state management
  • Consistent State: Single source of truth for data, loading, and error states
  • Smart Refresh: Convenient refresh() method
  • Race Condition Handling: Built-in request cancellation and proper sequencing

Core Features That Make the Difference

In this section we will explore all core features of the library.

1. 🔄 Unified State Stream

Instead of managing multiple observables, rx-stateful provides a single stream that encapsulates all state:

interface RxStateful<T, E> {
  value: T | null;          // The actual data
  hasValue: boolean;        // Convenience flag for data presence
  context: 'suspense' | 'next' | 'error';  // Current state context
  isSuspense: boolean;      // Loading state
  hasError: boolean;        // Error presence flag
  error: E | undefined;     // Error details
}

This unified approach ensures that your state is always consistent and predictable.

2. ⚡ Non-Flickering Loading States

A common UX issue is flickering loading indicators for fast requests. rxRequest solves this elegantly:

rxRequest({
  requestFn: () => api.getData(),
  config: {
    suspenseThresholdMs: 500,  // Wait 500ms before showing loader
    suspenseTimeMs: 500         // Show loader for at least 500ms
  }
})

In the viedo above you can see on the left side a flickering loading spinner for a fast requests. The loading spinner is blinking quickly.

In the middle you can see that no loading spinner is shown: In this example the request does not exceed the threshold and therefor no loading spinner is shown at all.

On the right side you can see that the loading spinner is shown for at least 500ms (suspenseTimeMs), even if the request is completed before that, preventing a flicker effect. In this case the request does take longer than the defined threshold.

This configuration ensures that:

  • Fast requests (< 500ms) don’t show a loader at all
  • When a loader is shown, it stays visible for at least 500ms (no flicker!)

3. 🔄 Smart Refresh Strategies

The library offers multiple refresh strategies that can be combined:

// Auto-refresh every 5 seconds for 1 minute
withAutoRefetch(5000, 60000)

// Auto-refresh until a signal
const stopPolling$ = new Subject<void>();
withAutoRefetch(5000, stopPolling$)

// Manual refresh with a trigger
withRefetchOnTrigger(refreshButton$)

// Combine multiple strategies
refetchStrategies: [
  withAutoRefetch(10000, Infinity),
  withRefetchOnTrigger(userRefresh$)
]

4. 🔄 Controlling Refresh Behavior

Clear value on refresh (default)

By default, the value is cleared when refreshing, showing a clean loading state.

Keep value on refresh

With keepValueOnRefresh: true, the old value remains visible while fetching new data:

rxRequest({
  requestFn: () => api.getData(),
  config: {
    keepValueOnRefresh: true  // Old data stays visible
  }
})

5. 🔴 Automatic Multicasting

All streams are automatically multicasted, preventing duplicate HTTP requests:

// This makes only ONE HTTP call!
const request = rxRequest({ 
  requestFn: () => api.getData() 
});

// Multiple subscriptions, single HTTP request
const value1$ = request.value$();
const value2$ = request.value$();

Unlike default Observable behavior, only one HTTP request is made even with multiple subscribers.

6. ⚙️ Powerful Configuration

Configure behavior globally or per-instance:

// Global configuration (in app.config.ts or AppModule)
provideRxStatefulConfig({
  keepValueOnRefresh: true,
  suspenseThresholdMs: 300,
  suspenseTimeMs: 300
})

// Instance override
rxRequest({
  requestFn: () => api.getData(),
  config: {
    keepValueOnRefresh: false,  // Override global config
    keepErrorOnRefresh: true,
    errorMappingFn: (error) => error.message
  }
})

7. 🎯 Error Handling and Recovery

Errors are automatically captured and exposed in the state stream:

rxRequest({
  requestFn: () => api.getData(),
  config: {
    keepErrorOnRefresh: true,  // Keep error visible on refresh
    errorMappingFn: (error) => error.message,  // Transform error for display
    beforeHandleErrorFn: (error) => console.error(error)  // Log errors
  }
})

Advanced Use Cases

Use Case 1: Parameterized Requests with Triggers

// Search with debounce and loading states
searchTrigger$ = new Subject<string>();

searchResults = rxRequest({
  trigger: this.searchTrigger$.pipe(
    debounceTime(300),
    distinctUntilChanged()
  ),
  requestFn: (query: string) => this.api.search(query),
  config: {
    keepValueOnRefresh: false,  // Clear old results
    suspenseThresholdMs: 200,    // Quick loader for search
    operator: 'switch'           // Cancel previous searches
  }
});

// Trigger search
onSearch(query: string) {
  this.searchTrigger$.next(query);
}

Use Case 2: Polling with Conditional Stop

// Poll until condition is met
const stopPolling$ = new Subject<void>();

orderStatus = rxRequest({
  requestFn: () => this.api.getOrderStatus(orderId),
  config: {
    refetchStrategies: withAutoRefetch(2000, stopPolling$)
  }
});

// Stop polling when order is complete
orderStatus.value$().pipe(
  filter(state => state.value?.status === 'completed'),
  take(1)
).subscribe(() => stopPolling$.next());

Use Case 3: Paginated Data Loading

// Pagination with state preservation
page$ = new BehaviorSubject<number>(1);

paginatedData = rxRequest({
  trigger: this.page$,
  requestFn: (page: number) => this.api.getProducts({ page, limit: 10 }),
  config: {
    keepValueOnRefresh: false,  // Clear data between pages
  }
});

// Navigate pages
nextPage() {
  this.page$.next(this.page$.value + 1);
}

previousPage() {
  this.page$.next(this.page$.value - 1);
}

Use Case 4: Dependent Requests

// Load user, then load their permissions
userId$ = new Subject<string>();

userRequest = rxRequest({
  trigger: this.userId$,
  requestFn: (id: string) => this.api.getUser(id)
});

permissionsRequest = rxRequest({
  trigger: this.userRequest.value$().pipe(
    filter(state => state.hasValue),
    map(state => state.value!.id)
  ),
  requestFn: (userId: string) => this.api.getUserPermissions(userId)
});

Comparison: Plain RxJS vs rx-stateful

Let’s look at a comprehensive example showing the dramatic difference:

Plain RxJS Implementation

class DataService {
  private dataSubject$ = new BehaviorSubject<Data | null>(null);
  private loadingSubject$ = new BehaviorSubject<boolean>(false);
  private errorSubject$ = new BehaviorSubject<Error | null>(null);
  private refreshInterval$: Subscription;
  private destroy$ = new Subject<void>();
  
  data$ = this.dataSubject$.asObservable();
  loading$ = this.loadingSubject$.asObservable();
  error$ = this.errorSubject$.asObservable();
  
  private loadingTimer$: Subscription;
  private minimumLoadingTime = 500;
  private currentId: string;
  
  loadData(id: string) {
    this.currentId = id;
    // Start loading
    const loadingStartTime = Date.now();
    this.loadingSubject$.next(true);
    this.errorSubject$.next(null);
    
    this.http.get<Data>(`/api/data/${id}`)
      .pipe(
        // Handle minimum loading time
        switchMap(data => {
          const elapsed = Date.now() - loadingStartTime;
          const remaining = Math.max(0, this.minimumLoadingTime - elapsed);
          return timer(remaining).pipe(map(() => data));
        }),
        takeUntil(this.destroy$)
      )
      .subscribe({
        next: (data) => {
          this.dataSubject$.next(data);
          this.loadingSubject$.next(false);
        },
        error: (error) => {
          this.errorSubject$.next(error);
          this.loadingSubject$.next(false);
          this.dataSubject$.next(null);
        }
      });
  }
  
  refresh(keepData = false) {
    if (!keepData) {
      this.dataSubject$.next(null);
    }
    const currentId = this.currentId;
    if (currentId) {
      this.loadData(currentId);
    }
  }
  
  startAutoRefresh(interval: number) {
    this.stopAutoRefresh();
    this.refreshInterval$ = interval(interval)
      .pipe(takeUntil(this.destroy$))
      .subscribe(() => this.refresh(true));
  }
  
  stopAutoRefresh() {
    this.refreshInterval$?.unsubscribe();
  }
  
  ngOnDestroy() {
    this.destroy$.next();
    this.destroy$.complete();
    this.stopAutoRefresh();
  }
}

rx-stateful Implementation

class DataService {
  private trigger$ = new Subject<string>();
  
  dataRequest = rxRequest({
    trigger: this.trigger$,
    requestFn: (id: string) => this.http.get<Data>(`/api/data/${id}`),
    config: {
      keepValueOnRefresh: true,
      suspenseThresholdMs: 500,
      suspenseTimeMs: 500,
      refetchStrategies: withAutoRefetch(30000, Infinity)
    }
  });
  
  data$ = this.dataRequest.value$();
  
  loadData(id: string) {
    this.trigger$.next(id);
  }
  
  refresh() {
    this.dataRequest.refresh();
  }
}

The difference is staggering:

  • 80% less code to write and maintain
  • Zero subscription management needed
  • Built-in race condition handling
  • Automatic error recovery
  • Sophisticated UX features out of the box

Testing

The library provides testing utilities to make testing components using rx-stateful simple:

import { mockRxRequest } from '@angular-kit/rx-stateful/testing';

it('should handle data loading', () => {
  const mockRequest = mockRxRequest<User[]>();
  
  // Replace the real request with mock
  component.userRequest = mockRequest.instance;
  
  // Emit test data
  mockRequest.state$Trigger.next({
    value: [{ id: 1, name: 'Test' }],
    hasValue: true,
    context: 'next',
    hasError: false,
    error: undefined,
    isSuspense: false
  });
  
  // Assert component behavior
  component.data$.subscribe(state => {
    expect(state.value).toEqual([{ id: 1, name: 'Test' }]);
  });
  
  // Test refresh functionality
  component.refresh();
  expect(mockRequest.refreshTrigger).toHaveBeenCalled();
});

Real-World Impact

After implementing rx-stateful in production applications, teams have reported:

  • Reduction in async-related bugs
  • Improved user experience with non-flickering loaders
  • Faster development with less boilerplate
  • Better maintainability with cleaner, more declarative code
  • Consistent error handling across the application

Conclusion

@angular-kit/rx-stateful is a perfect fit to reduce the complexity of async state management and eliminiate boilerplate code.

Getting Started

npm install @angular-kit/rx-stateful
import { rxRequest } from '@angular-kit/rx-stateful';

// Start using it immediately
const myRequest = rxRequest({
  requestFn: () => fetch('/api/data').then(r => r.json()),
  config: {
    suspenseThresholdMs: 500,
    suspenseTimeMs: 500
  }
});

myRequest.value$().subscribe(state => {
  console.log('Current state:', state);
});

Resources


Ready to revolutionize your async state management? Start using @angular-kit/rx-stateful today and experience the difference firsthand.