Docs
/
Angular
Chapter 9
09 — RxJS & Observables
What is RxJS?
RxJS is a library for reactive programming using Observables. Angular uses it extensively for:
- HTTP responses
- Router events
- Form value changes
- Component communication
- State management
Observable → emits values over time → Operators transform them → Observer receives them
Observable vs Promise
| Observable | Promise | |
|---|---|---|
| Values | Multiple values over time | Single value |
| Lazy | Yes — runs only when subscribed | No — runs immediately |
| Cancellable | Yes — unsubscribe() | No |
| Operators | map, filter, switchMap, etc. | .then(), .catch() |
| Use in Angular | HTTP, events, state | Can be used via firstValueFrom() |
Creating Observables
import { Observable, of, from, interval, timer, fromEvent, Subject, BehaviorSubject } from 'rxjs';
// From static values
const nums$ = of(1, 2, 3);
const arr$ = from([10, 20, 30]);
const promise$ = from(fetch('/api/data'));
// Time-based
const every1s$ = interval(1000); // 0, 1, 2, 3... every second
const after3s$ = timer(3000); // Emit once after 3 seconds
const repeat$ = timer(0, 5000); // Emit immediately, then every 5s
// From DOM events
const clicks$ = fromEvent(document, 'click');
// Custom observable
const custom$ = new Observable<number>(subscriber => {
subscriber.next(1);
subscriber.next(2);
setTimeout(() => subscriber.next(3), 1000);
setTimeout(() => subscriber.complete(), 2000);
// Teardown logic
return () => console.log('Unsubscribed');
});
Essential Operators
Transformation
import { map, switchMap, mergeMap, concatMap, exhaustMap } from 'rxjs';
// map — transform each value
users$.pipe(map(users => users.filter(u => u.active)));
// switchMap — cancel previous, switch to new (MOST USED)
// Use for: search, route param changes, dropdown selection
searchTerm$.pipe(
debounceTime(300),
distinctUntilChanged(),
switchMap(term => this.http.get(`/api/search?q=${term}`)),
);
// mergeMap — run in parallel (no cancellation)
// Use for: fire-and-forget, logging, analytics
items$.pipe(mergeMap(item => this.http.post('/api/track', item)));
// concatMap — queue and run sequentially
// Use for: ordered operations (file uploads one by one)
files$.pipe(concatMap(file => this.uploadService.upload(file)));
// exhaustMap — ignore new while previous is in-flight
// Use for: login/submit buttons (prevent double-click)
loginClick$.pipe(exhaustMap(() => this.authService.login(credentials)));
Filtering
import { filter, take, first, distinctUntilChanged, skip, debounceTime, throttleTime } from 'rxjs';
// filter — only pass matching values
nums$.pipe(filter(n => n > 5));
// take — take first N values then complete
events$.pipe(take(3));
// first — take first value (or first matching) then complete
users$.pipe(first(u => u.role === 'admin'));
// distinctUntilChanged — skip consecutive duplicates
input$.pipe(distinctUntilChanged());
// debounceTime — wait for pause in emissions
searchInput$.pipe(debounceTime(300));
// throttleTime — emit at most once per interval
scroll$.pipe(throttleTime(200));
Combination
import { combineLatest, forkJoin, merge, concat, withLatestFrom } from 'rxjs';
// combineLatest — latest from each (re-emits when ANY source emits)
combineLatest([user$, settings$]).pipe(
map(([user, settings]) => ({ user, settings })),
);
// forkJoin — wait for ALL to complete (like Promise.all)
forkJoin({
users: this.http.get<User[]>('/api/users'),
roles: this.http.get<Role[]>('/api/roles'),
}).subscribe(({ users, roles }) => { ... });
// merge — interleave emissions from multiple sources
merge(click$, keypress$, touch$).subscribe(event => handleInput(event));
// withLatestFrom — combine with latest from another source
save$.pipe(
withLatestFrom(form$),
map(([_, formValue]) => formValue),
);
Error Handling
import { catchError, retry, retryWhen, throwError, EMPTY } from 'rxjs';
// catchError — handle error, return fallback
this.http.get('/api/data').pipe(
catchError(err => {
console.error(err);
return of([]); // Return fallback value
}),
);
// retry — retry N times on error
this.http.get('/api/data').pipe(retry(3));
// retry with delay
this.http.get('/api/data').pipe(
retry({ count: 3, delay: 1000 }),
);
// catchError + re-throw
this.http.get('/api/data').pipe(
catchError(err => {
this.toastService.error('Failed to load');
return throwError(() => err); // Re-throw for caller to handle
}),
);
Utility
import { tap, finalize, delay, shareReplay, startWith } from 'rxjs';
// tap — side effect without modifying the stream
this.http.get('/api/users').pipe(
tap(users => console.log('Fetched:', users.length)),
);
// finalize — run cleanup when observable completes or errors
this.http.post('/api/save', data).pipe(
finalize(() => this.loading = false),
);
// startWith — emit initial value before source
users$.pipe(startWith([]));
// shareReplay — cache last emission, share among subscribers
users$ = this.http.get<User[]>('/api/users').pipe(shareReplay(1));
Subjects
Subjects are both Observable and Observer — they can emit values AND be subscribed to.
| Type | Behavior |
|---|---|
Subject | No initial value, only future emissions |
BehaviorSubject | Has initial value, emits latest to new subscribers |
ReplaySubject | Replays N past values to new subscribers |
AsyncSubject | Emits only the last value when it completes |
// BehaviorSubject — most common for state
@Injectable({ providedIn: 'root' })
export class AuthService {
private userSubject = new BehaviorSubject<User | null>(null);
user$ = this.userSubject.asObservable(); // Expose as read-only
login(credentials: LoginDto) {
return this.http.post<User>('/api/login', credentials).pipe(
tap(user => this.userSubject.next(user)),
);
}
logout() {
this.userSubject.next(null);
}
get currentUser() { return this.userSubject.value; } // Sync access
}
Common Patterns
Search with Debounce
searchControl = new FormControl('');
results$ = this.searchControl.valueChanges.pipe(
debounceTime(300),
distinctUntilChanged(),
filter(term => term!.length >= 2),
switchMap(term => this.searchService.search(term!)),
);
Polling
data$ = timer(0, 30000).pipe( // Immediately, then every 30s
switchMap(() => this.http.get('/api/status')),
shareReplay(1),
);
Loading State
private loadingSubject = new BehaviorSubject(false);
loading$ = this.loadingSubject.asObservable();
fetchData() {
this.loadingSubject.next(true);
return this.http.get('/api/data').pipe(
finalize(() => this.loadingSubject.next(false)),
);
}
Unsubscribing — Preventing Memory Leaks
// 1. async pipe (BEST — auto-unsubscribes)
// In template: {{ data$ | async }}
// 2. takeUntilDestroyed (Angular 16+)
private destroyRef = inject(DestroyRef);
ngOnInit() {
source$.pipe(takeUntilDestroyed(this.destroyRef)).subscribe(...);
}
// 3. take(1) — for one-time reads
this.http.get('/api/config').pipe(take(1)).subscribe(...);
// HTTP observables complete after one emission anyway, but it's explicit
// 4. Manual (avoid if possible)
private sub = source$.subscribe(...);
ngOnDestroy() { this.sub.unsubscribe(); }
Key Takeaways
switchMapis your default flattening operator — cancels previous requests- Use
exhaustMapfor form submissions / login to prevent double-clicks BehaviorSubjectis the go-to for shared state in servicesshareReplay(1)turns a cold observable (HTTP) into a warm cached one- Always unsubscribe — prefer
asyncpipe >takeUntilDestroyed> manual forkJoin= Promise.all for observables;combineLatest= reactive combinationdebounceTime+distinctUntilChanged+switchMap= the search pattern