Docs
/
Angular
Chapter 9

09 — RxJS & Observables

What is RxJS?

RxJS is a library for reactive programming using Observables. Angular uses it extensively for:

  • HTTP responses
  • Router events
  • Form value changes
  • Component communication
  • State management
Observable → emits values over time → Operators transform them → Observer receives them

Observable vs Promise

ObservablePromise
ValuesMultiple values over timeSingle value
LazyYes — runs only when subscribedNo — runs immediately
CancellableYes — unsubscribe()No
Operatorsmap, filter, switchMap, etc..then(), .catch()
Use in AngularHTTP, events, stateCan be used via firstValueFrom()

Creating Observables

import { Observable, of, from, interval, timer, fromEvent, Subject, BehaviorSubject } from 'rxjs';

// From static values
const nums$ = of(1, 2, 3);
const arr$  = from([10, 20, 30]);
const promise$ = from(fetch('/api/data'));

// Time-based
const every1s$ = interval(1000);          // 0, 1, 2, 3... every second
const after3s$ = timer(3000);             // Emit once after 3 seconds
const repeat$  = timer(0, 5000);          // Emit immediately, then every 5s

// From DOM events
const clicks$ = fromEvent(document, 'click');

// Custom observable
const custom$ = new Observable<number>(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  setTimeout(() => subscriber.next(3), 1000);
  setTimeout(() => subscriber.complete(), 2000);
  // Teardown logic
  return () => console.log('Unsubscribed');
});

Essential Operators

Transformation

import { map, switchMap, mergeMap, concatMap, exhaustMap } from 'rxjs';

// map — transform each value
users$.pipe(map(users => users.filter(u => u.active)));

// switchMap — cancel previous, switch to new (MOST USED)
// Use for: search, route param changes, dropdown selection
searchTerm$.pipe(
  debounceTime(300),
  distinctUntilChanged(),
  switchMap(term => this.http.get(`/api/search?q=${term}`)),
);

// mergeMap — run in parallel (no cancellation)
// Use for: fire-and-forget, logging, analytics
items$.pipe(mergeMap(item => this.http.post('/api/track', item)));

// concatMap — queue and run sequentially
// Use for: ordered operations (file uploads one by one)
files$.pipe(concatMap(file => this.uploadService.upload(file)));

// exhaustMap — ignore new while previous is in-flight
// Use for: login/submit buttons (prevent double-click)
loginClick$.pipe(exhaustMap(() => this.authService.login(credentials)));

Filtering

import { filter, take, first, distinctUntilChanged, skip, debounceTime, throttleTime } from 'rxjs';

// filter — only pass matching values
nums$.pipe(filter(n => n > 5));

// take — take first N values then complete
events$.pipe(take(3));

// first — take first value (or first matching) then complete
users$.pipe(first(u => u.role === 'admin'));

// distinctUntilChanged — skip consecutive duplicates
input$.pipe(distinctUntilChanged());

// debounceTime — wait for pause in emissions
searchInput$.pipe(debounceTime(300));

// throttleTime — emit at most once per interval
scroll$.pipe(throttleTime(200));

Combination

import { combineLatest, forkJoin, merge, concat, withLatestFrom } from 'rxjs';

// combineLatest — latest from each (re-emits when ANY source emits)
combineLatest([user$, settings$]).pipe(
  map(([user, settings]) => ({ user, settings })),
);

// forkJoin — wait for ALL to complete (like Promise.all)
forkJoin({
  users: this.http.get<User[]>('/api/users'),
  roles: this.http.get<Role[]>('/api/roles'),
}).subscribe(({ users, roles }) => { ... });

// merge — interleave emissions from multiple sources
merge(click$, keypress$, touch$).subscribe(event => handleInput(event));

// withLatestFrom — combine with latest from another source
save$.pipe(
  withLatestFrom(form$),
  map(([_, formValue]) => formValue),
);

Error Handling

import { catchError, retry, retryWhen, throwError, EMPTY } from 'rxjs';

// catchError — handle error, return fallback
this.http.get('/api/data').pipe(
  catchError(err => {
    console.error(err);
    return of([]);              // Return fallback value
  }),
);

// retry — retry N times on error
this.http.get('/api/data').pipe(retry(3));

// retry with delay
this.http.get('/api/data').pipe(
  retry({ count: 3, delay: 1000 }),
);

// catchError + re-throw
this.http.get('/api/data').pipe(
  catchError(err => {
    this.toastService.error('Failed to load');
    return throwError(() => err);   // Re-throw for caller to handle
  }),
);

Utility

import { tap, finalize, delay, shareReplay, startWith } from 'rxjs';

// tap — side effect without modifying the stream
this.http.get('/api/users').pipe(
  tap(users => console.log('Fetched:', users.length)),
);

// finalize — run cleanup when observable completes or errors
this.http.post('/api/save', data).pipe(
  finalize(() => this.loading = false),
);

// startWith — emit initial value before source
users$.pipe(startWith([]));

// shareReplay — cache last emission, share among subscribers
users$ = this.http.get<User[]>('/api/users').pipe(shareReplay(1));

Subjects

Subjects are both Observable and Observer — they can emit values AND be subscribed to.

TypeBehavior
SubjectNo initial value, only future emissions
BehaviorSubjectHas initial value, emits latest to new subscribers
ReplaySubjectReplays N past values to new subscribers
AsyncSubjectEmits only the last value when it completes
// BehaviorSubject — most common for state
@Injectable({ providedIn: 'root' })
export class AuthService {
  private userSubject = new BehaviorSubject<User | null>(null);
  user$ = this.userSubject.asObservable();   // Expose as read-only

  login(credentials: LoginDto) {
    return this.http.post<User>('/api/login', credentials).pipe(
      tap(user => this.userSubject.next(user)),
    );
  }

  logout() {
    this.userSubject.next(null);
  }

  get currentUser() { return this.userSubject.value; }   // Sync access
}

Common Patterns

Search with Debounce

searchControl = new FormControl('');

results$ = this.searchControl.valueChanges.pipe(
  debounceTime(300),
  distinctUntilChanged(),
  filter(term => term!.length >= 2),
  switchMap(term => this.searchService.search(term!)),
);

Polling

data$ = timer(0, 30000).pipe(         // Immediately, then every 30s
  switchMap(() => this.http.get('/api/status')),
  shareReplay(1),
);

Loading State

private loadingSubject = new BehaviorSubject(false);
loading$ = this.loadingSubject.asObservable();

fetchData() {
  this.loadingSubject.next(true);
  return this.http.get('/api/data').pipe(
    finalize(() => this.loadingSubject.next(false)),
  );
}

Unsubscribing — Preventing Memory Leaks

// 1. async pipe (BEST — auto-unsubscribes)
// In template: {{ data$ | async }}

// 2. takeUntilDestroyed (Angular 16+)
private destroyRef = inject(DestroyRef);

ngOnInit() {
  source$.pipe(takeUntilDestroyed(this.destroyRef)).subscribe(...);
}

// 3. take(1) — for one-time reads
this.http.get('/api/config').pipe(take(1)).subscribe(...);
// HTTP observables complete after one emission anyway, but it's explicit

// 4. Manual (avoid if possible)
private sub = source$.subscribe(...);
ngOnDestroy() { this.sub.unsubscribe(); }

Key Takeaways

  • switchMap is your default flattening operator — cancels previous requests
  • Use exhaustMap for form submissions / login to prevent double-clicks
  • BehaviorSubject is the go-to for shared state in services
  • shareReplay(1) turns a cold observable (HTTP) into a warm cached one
  • Always unsubscribe — prefer async pipe > takeUntilDestroyed > manual
  • forkJoin = Promise.all for observables; combineLatest = reactive combination
  • debounceTime + distinctUntilChanged + switchMap = the search pattern