Co da ci RxJS w jeden weekend
Realistyczny zakres: ile da się opanować w 2–3 dni
Weekend wystarczy, żeby zbudować solidny fundament RxJS: rozumieć, czym jest Observable, jak działają podstawowe operatory i jak kontrolować cykl życia subskrypcji. To już pozwala pisać sensowne, reaktywne fragmenty kodu w interfejsie i przy pracy z API.
W tak krótkim czasie nie opanujesz wszystkich operatorów, złożonych wzorców architektonicznych ani całego ekosystemu. Nie musisz. W praktyce większość frontendowych zadań da się ogarnąć kilkunastoma operatorami i kilkoma prostymi wzorcami. Reszta to detale, które dojdziesz w projektach.
Po jednym intensywnym weekendzie powinieneś:
- swobodnie tworzyć observables z eventów, tablic i HTTP,
- używać kluczowych operatorów: map, filter, tap, switchMap, mergeMap, debounceTime, distinctUntilChanged, catchError,
- rozumieć różnicę między Subject a BehaviorSubject,
- opanować podstawy zarządzania subskrypcjami i unikania wycieków pamięci,
- spiąć to w jedną, prostą mini-aplikację reaktywną.
Na później zostają tematy typu: schedulery, zaawansowane łączenie wielu strumieni, wzorce jak CQRS/Redux oparte na RxJS czy optymalizacje pod duże aplikacje.
Jak wyglądają aplikacje bez RxJS
W klasycznym kodzie JS/TS interakcje asynchroniczne są klejone ręcznie: callbacki w callbackach, gąszcz .then(), setTimeouty, event listenery rozsiane po plikach. Z czasem utrzymanie takiego kodu staje się koszmarem.
Typowe problemy bez programowania reaktywnego:
- Callback hell – zagnieżdżone funkcje, które trudno czytać i testować.
- Promise spaghetti – łańcuchy
then/catch/finally, w które trudno wstrzyknąć dodatkową logikę (np. retry, timeout, cancel). - Ręczne zarządzanie eventami – każdy event listener gdzieś indziej, brak centralnej logiki łączenia i filtrowania zdarzeń.
- Duplikacja kodu – to samo podejście do obsługi błędów, loadera, ponawiania zapytań kopiowane w wielu miejscach.
RxJS pozwala traktować to wszystko jak jeden mechanizm: strumienie zdarzeń przekształcasz operatorami, a efektem jest prostszy, bardziej przewidywalny kod.
Typowe zastosowania RxJS w codziennej pracy
RxJS wchodzi do gry wszędzie tam, gdzie dane się zmieniają w czasie lub reagują na zdarzenia użytkownika.
- UI – reagowanie na kliknięcia, ruch myszką, przewijanie (scroll), zmiany w polach formularzy.
- API – kolejki zapytań HTTP, anulowanie starych zapytań, debouncing wyszukiwania, retry przy błędach.
- Autosave – zapisywanie wersji roboczej po chwili bezczynności, bez zasypywania serwera żądaniami.
- Filtrowanie i sortowanie – dynamiczne listy i tabele reagujące na kombinację wielu filtrów.
- WebSocket / SSE – strumień danych „na żywo”, np. notyfikacje, czaty, wykresy.
Jeśli tworzysz front w Angularze, RxJS spotykasz niemal wszędzie: HttpClient, formularze reaktywne, Router, async pipe w szablonach.
Minimalny zestaw pojęć na start
Żeby nie utknąć na podstawach, w weekend ogarnij te elementy:
- Observable – strumień emitujący wartości w czasie.
- Observer – obiekt/funkcja, która odbiera wartości (
next,error,complete). - Subscription – uchwyt do aktywnej subskrypcji, pozwala ją przerwać.
- Operatory – funkcje przekształcające strumienie (np.
map,filter,switchMap). - Subject/BehaviorSubject – aktywne źródła danych, do których sam możesz „wypychać” wartości.
- Hot vs cold – różnica między strumieniem, który zaczyna emitować dopiero przy subskrypcji, a takim, który emituje niezależnie od subskrybentów.
Ten pakiet wystarcza, by swobodnie poruszać się w prostych i średnio złożonych scenariuszach RxJS w aplikacjach webowych.
O co chodzi w programowaniu reaktywnym (bez filozofii)
Wartości w czasie zamiast pojedynczej wartości
Klasyczna zmienna to fotografia: jedna wartość w danej chwili. Strumień to film: sekwencja wartości w czasie. Programowanie reaktywne skupia się na pracy z tym filmem zamiast z pojedynczym kadrem.
W RxJS Observable to właśnie taka „wartość w czasie”. Nie mówisz „daj mi wartość teraz”, tylko „powiedz mi o każdej nowej wartości, która się pojawi”. Ustalasz, jak reagujesz, gdy ta wartość się zmienia.
Dla przykładu: wpisywana fraza wyszukiwania to nie jedna wartość, tylko strumień kolejnych tekstów po każdym naciśnięciu klawisza. RxJS pozwala ten strumień:
- przefiltrować (np. ignorować długość < 3),
- odszumić (debounce),
- przekształcić w zapytania HTTP,
- złączyć wyniki z innymi strumieniami stanu (loader, błędy).
Imperatywny kod vs podejście reaktywne
Imperatywnie piszesz „krok po kroku”: pobierz, poczekaj, zrób coś, wyświetl. Kod jest sekwencją instrukcji. Gdy pojawia się wiele źródeł zdarzeń (user, API, WebSocket), logika zaczyna się przeplatać i mieszać.
Reaktywnie opisujesz zależności: dane A zależą od B i C, a UI reaguje na zmiany A. Reakcje są deklaratywne – opisujesz co ma się dziać, gdy napływają kolejne wartości.
Zamiast:
button.addEventListener('click', () => {
fetchData().then(result => {
render(result);
}).catch(showError);
});
budujesz strumień kliknięć i przepuszczasz go przez operatory, łącząc z HTTP i UI. Efekt jest często krótszy, czytelniejszy i łatwiejszy do testowania.
Excel jako intuicyjna analogia
Excel jest dobrym obrazem programowania reaktywnego. Komórka z formułą =A1+B1 „słucha” zmian w A1 i B1. Gdy zmienisz A1, arkusz automatycznie przelicza wynik, bez twojej ręcznej ingerencji.
W kodzie reaktywnym tworzysz „komórki” zależne od innych strumieni: np. total$ zależne od items$ i discount$. Gdy którykolwiek z nich się zmieni, nowa wartość totalu pojawi się automatycznie.
To dokładnie to samo: deklarujesz zależności, a system dba o propagację zmian. RxJS jest takim „silnikiem Excela” dla twojej aplikacji.
Skąd biorą się strumienie w aplikacji frontowej
Najczęstsze źródła strumieni w przeglądarce:
- Zdarzenia DOM – kliknięcia, input, scroll, keyup, mousemove.
- HTTP – odpowiedzi z API, ponawiane żądania, polling.
- WebSocket / SSE – wiadomości przychodzące z serwera w czasie rzeczywistym.
- Czas – interwały, timeouty, zegary, animacje.
- Wewnętrzny stan aplikacji – np. aktualny użytkownik, koszyk, uprawnienia.
RxJS pozwala zamienić każdy z tych obszarów na Observable, a potem spokojnie je składać, filtrować i transformować.

Rdzeń RxJS w praktyce: Observable, Observer, Subscription
Observable: źródło, które pcha dane
Observable to obiekt reprezentujący strumień danych. Emisja odbywa się przez trzy kanały:
next(value)– nowa wartość,error(err)– błąd kończący strumień,complete()– normalne zakończenie strumienia.
Tworzenie własnego observable:
import { Observable } from 'rxjs';
const my$ = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.complete();
});
W praktyce częściej sięgasz po gotowe „fabryki” observabli zamiast pisać je od zera.
Fabryki observable: of, from, fromEvent, interval
Najczęściej używane funkcje tworzące strumienie:
- of(…values) – emituje podane wartości i natychmiast się kończy:
import { of } from 'rxjs';
const numbers$ = of(1, 2, 3); // 1, 2, 3, complete
- from(iterable/promise) – zamienia tablicę, mapę, Promise na observable:
import { from } from 'rxjs';
const arr$ = from([1, 2, 3]); // 1, 2, 3, complete
const promise$ = from(fetch('/api/data')); // odpowiedź jako wartość
- fromEvent(target, eventName) – strumień zdarzeń DOM lub innych emitterów:
import { fromEvent } from 'rxjs';
const clicks$ = fromEvent(button, 'click');
- interval(ms) – nieskończony strumień liczb (0, 1, 2, …) co zadany interwał:
import { interval } from 'rxjs';
const tick$ = interval(1000); // 0,1,2,... co sekundę
Observer: jak odbierać next, error i complete
Subskrypcja to przekazanie observera do observable. Observer to obiekt z trzema funkcjami: next, error, complete. Można też podać tylko funkcję next.
const subscription = numbers$.subscribe({
next: value => console.log('value', value),
error: err => console.error('error', err),
complete: () => console.log('done'),
});
Skrót:
numbers$.subscribe(value => console.log(value)); // tylko next
W Angularze observerem często jest logika w serwisie lub komponencie: aktualizacja stanu, zmiana pól komponentu, wywołanie metod serwisu.
Subscription i cykl życia strumienia
Metoda .subscribe() zwraca obiekt Subscription. To uchwyt, dzięki któremu możesz zakończyć subskrypcję:
const sub = interval(1000).subscribe(v => console.log(v));
setTimeout(() => {
sub.unsubscribe(); // zatrzymuje interval
}, 5000);
Niekończące się strumienie (interval, fromEvent) wymagają świadomego domykania. W przeciwnym razie zostaną w pamięci razem z powiązanymi obiektami (wyciek pamięci).
W Angularze najczęstsze sposoby zarządzania subskrypcjami:
- async pipe w template (zamykany automatycznie),
takeUntil(destroy$)w komponentach,- ręczne
ngOnDestroyzsubscription.unsubscribe().
Przykład: kliknięcia przycisku z odsubskrybowaniem
Prosty scenariusz: loguj kliknięcia przycisku, ale tylko przez 10 sekund.
import { fromEvent, timer, takeUntil } from 'rxjs';
const button = document.querySelector('#save');
const clicks$ = fromEvent(button, 'click');
const stop$ = timer(10000); // po 10 s wyemituje 0 i się zakończy
const sub = clicks$
.pipe(takeUntil(stop$))
.subscribe(() => console.log('klik'));
// sub.unsubscribe() niepotrzebne – takeUntil zakończy strumień
Tutaj takeUntil pilnuje cyklu życia subskrypcji za nas, co jest wygodne też w komponentach.
Pierwszy dzień: operatory, bez których nie ma sensu ruszać dalej
Operatory tworzące i transformujące: map, filter, tap
Operatory transformujące to serce RxJS. Za ich pomocą opisujesz, co ma się dziać z kolejnymi wartościami w strumieniu.
map – przekształcenie wartości
map zmienia każdą wartość w inną, zachowując kolejność i liczbę emisji.
import { of, map } from 'rxjs';
of(1, 2, 3)
.pipe(map(x => x * 10))
.subscribe(console.log); // 10, 20, 30
Zastosowania:
- wyciąganie pola z obiektu (
map(user => user.name)), - formatowanie daty/tekstu,
filter – odrzucanie niepotrzebnych emisji
filter przepuszcza tylko te wartości, które spełniają warunek. Reszta znika ze strumienia.
import { of, filter } from 'rxjs';
of(1, 2, 3, 4, 5)
.pipe(filter(x => x % 2 === 0))
.subscribe(console.log); // 2, 4
Typowe użycia:
- walidacja danych wejściowych (np. długość tekstu),
- ignorowanie zdarzeń, które cię nie interesują (np. klawisze inne niż Enter),
- przepuszczanie tylko „pełnych” obiektów po walidacji schematu.
tap – efekt uboczny bez zmiany strumienia
tap pozwala podglądać wartości lub odpalać efekty uboczne bez modyfikowania strumienia.
import { of, map, tap } from 'rxjs';
of(1, 2, 3)
.pipe(
tap(v => console.log('przed map', v)),
map(v => v * 2),
tap(v => console.log('po map', v)),
)
.subscribe();
Przydaje się do logowania, prostych metryk, wywołań typu „fire-and-forget” (np. powiadomienie o zdarzeniu do zewnętrznego trackera), ale bez dotykania kształtu danych.
Operatory czasu: debounceTime, throttleTime, distinctUntilChanged
Praca z UI niemal zawsze wymaga kontroli częstotliwości zdarzeń. Kilka operatorów załatwia większość takich przypadków.
debounceTime – poczekaj, aż użytkownik skończy pisać
debounceTime(ms) czeka, aż przez dany czas nie pojawi się nowa wartość, i dopiero wtedy ją przepuszcza.
import { fromEvent, map, debounceTime } from 'rxjs';
const searchInput = document.querySelector('#search');
const search$ = fromEvent(searchInput, 'input').pipe(
map((e: any) => e.target.value),
debounceTime(300),
);
search$.subscribe(term => {
console.log('szukaj:', term);
});
Używane do „odszumiania” inputów, scrolla, resize okna – wszędzie tam, gdzie nie chcesz reagować na każdą pojedynczą zmianę.
throttleTime – reaguj co jakiś czas, resztę ignoruj
throttleTime(ms) przepuszcza pierwszą wartość, a kolejne przez zadany czas ignoruje.
import { fromEvent, throttleTime } from 'rxjs';
const btn = document.querySelector('#like');
fromEvent(btn, 'click')
.pipe(throttleTime(2000))
.subscribe(() => console.log('klik zaakceptowany'));
Dobre do zabezpieczania przycisków przed spamem, reagowania na scroll co X ms, lekkiej ochrony endpointów.
distinctUntilChanged – reaguj tylko na faktyczną zmianę
distinctUntilChanged() porównuje kolejne wartości i przepuszcza tylko te, które są inne niż poprzednia.
import { of, distinctUntilChanged } from 'rxjs';
of(1, 1, 2, 2, 2, 3)
.pipe(distinctUntilChanged())
.subscribe(console.log); // 1, 2, 3
W połączeniu z inputami zmniejsza liczbę zapytań i renderów. Przykład: filtruj wyszukiwanie tylko gdy tekst naprawdę się zmienił, a nie gdy user wpisze tę samą frazę.
Łączenie strumieni: mergeMap, switchMap, concatMap
Najwięcej zamieszania robią operatory, które z jednej wartości robią nowy strumień (np. z kliknięcia – request HTTP). To tzw. operatory „flattening”.
switchMap – idealny do wyszukiwania i autouzupełniania
switchMap bierze każdą wartość, zamienia ją na nowy observable i subskrybuje, a poprzedni porzuca.
import { fromEvent, map, debounceTime, distinctUntilChanged, switchMap } from 'rxjs';
import { ajax } from 'rxjs/ajax';
const input = document.querySelector('#search');
fromEvent(input, 'input')
.pipe(
map((e: any) => e.target.value),
debounceTime(300),
distinctUntilChanged(),
switchMap(term =>
ajax.getJSON(`/api/search?q=${encodeURIComponent(term)}`)
),
)
.subscribe(results => {
console.log('wyniki', results);
});
Przy nowym zapytaniu poprzednie żądanie jest logicznie ignorowane. Efekt: zero wyścigów odpowiedzi, pokazujesz tylko najświeższe dane.
mergeMap – równolegle, bez odwoływania poprzednich
mergeMap (dawniej flatMap) nie odsubskrybowuje poprzednich wewnętrznych obserwabli. Każde wejście uruchamia nowy strumień, wszystkie lecą równolegle.
import { fromEvent, mergeMap } from 'rxjs';
import { ajax } from 'rxjs/ajax';
fromEvent(document, 'click')
.pipe(
mergeMap(() => ajax.getJSON('/api/track-click')),
)
.subscribe();
Dobre do akcji, które mogą się wykonywać równolegle: logowanie zdarzeń, pobieranie niezależnych danych, upload wielu plików.
concatMap – po kolei, bez nakładania się
concatMap ustawia zlecenia w kolejce. Następne rusza dopiero po zakończeniu poprzedniego.
import { fromEvent, concatMap } from 'rxjs';
import { ajax } from 'rxjs/ajax';
const saveBtn = document.querySelector('#save');
fromEvent(saveBtn, 'click')
.pipe(
concatMap(() => ajax.post('/api/save', { payload: '...' })),
)
.subscribe();
Sprawdza się przy operacjach, które muszą być sekwencyjne: zapisy do API, migracje, batchowe akcje administracyjne.
Kontrola zakończenia strumienia: take, takeUntil, takeWhile
Z tym zestawem można ogarnąć większość cykli życia bez ręcznego unsubscribe.
take – tylko pierwsze N wartości
take(n) przepuszcza N wartości, po czym kończy strumień.
import { interval, take } from 'rxjs';
interval(1000)
.pipe(take(3))
.subscribe(console.log); // 0,1,2 i complete
takeUntil – zakończ, gdy inny strumień coś wyemituje
takeUntil(notifier$) trzyma subskrypcję, dopóki notifier$ nie nada pierwszej wartości.
import { fromEvent, interval, takeUntil } from 'rxjs';
const stop$ = fromEvent(document, 'mouseup');
interval(500)
.pipe(takeUntil(stop$))
.subscribe(console.log);
Do komponentów w Angularze zwykle używa się destroy$ wywoływanego w ngOnDestroy.
takeWhile – dopóki warunek jest spełniony
takeWhile(predicate) przepuszcza wartości tak długo, jak warunek zwraca true.
import { interval, takeWhile } from 'rxjs';
interval(1000)
.pipe(takeWhile(v => v < 5))
.subscribe(console.log); // 0..4

Drugi dzień: Subject, BehaviorSubject i spółka – komunikacja wewnątrz aplikacji
Subject jako most między światem „push” i „pull”
Subject jest jednocześnie Observable i Observerem. Możesz na nim wywołać next(), a inni mogą się na niego subskrybować.
import { Subject } from 'rxjs';
const events$ = new Subject<string>();
events$.subscribe(v => console.log('A:', v));
events$.subscribe(v => console.log('B:', v));
events$.next('hello'); // A: hello, B: hello
Dobrze się sprawdza jako wewnętrzna „szyna zdarzeń” w serwisie lub module.
BehaviorSubject – aktualny stan + wartości przyszłe
BehaviorSubject przechowuje ostatnią wartość i natychmiast wysyła ją nowym subskrybentom.
import { BehaviorSubject } from 'rxjs';
const counter$ = new BehaviorSubject(0);
counter$.subscribe(v => console.log('sub1', v)); // sub1 0
counter$.next(1);
counter$.next(2);
counter$.subscribe(v => console.log('sub2', v)); // sub2 2
W praktyce używaj go jak prostego store’a: przechowuje aktualny stan, a komponenty obserwują zmiany.
ReplaySubject – odtwórz historię nowym subskrybentom
ReplaySubject przechowuje bufor ostatnich N emisji (lub wszystkie w wersji bez limitu) i odtwarza je nowym subskrybentom.
import { ReplaySubject } from 'rxjs';
const log$ = new ReplaySubject<string>(2); // ostatnie 2
log$.next('a');
log$.next('b');
log$.next('c');
log$.subscribe(v => console.log('sub', v)); // b, c
Przydatne, gdy później dołącza się komponent, który musi „nadrobić” kluczowe dane z przeszłości.
Subject jako bus zdarzeń między komponentami
Prosty serwis do komunikacji: jeden komponent nadaje, drugi słucha.
// notification.service.ts
import { Injectable } from '@angular/core';
import { Subject } from 'rxjs';
@Injectable({ providedIn: 'root' })
export class NotificationService {
private messagesSubject = new Subject<string>();
messages$ = this.messagesSubject.asObservable();
push(message: string) {
this.messagesSubject.next(message);
}
}
// sender.component.ts
constructor(private notification: NotificationService) {}
save() {
// ... zapis
this.notification.push('Zapisano dane');
}
// viewer.component.ts
messages$ = this.notification.messages$;
W template viewer używasz | async, żeby wyświetlać kolejne komunikaty.
Prosty store na BehaviorSubject
Dla małych aplikacji wystarczy serwis oparty o BehaviorSubject.
// cart.store.ts
import { Injectable } from '@angular/core';
import { BehaviorSubject } from 'rxjs';
export interface CartItem {
id: string;
name: string;
qty: number;
}
@Injectable({ providedIn: 'root' })
export class CartStore {
private itemsSubject = new BehaviorSubject<CartItem[]>([]);
items$ = this.itemsSubject.asObservable();
add(item: CartItem) {
const items = this.itemsSubject.value;
this.itemsSubject.next([...items, item]);
}
clear() {
this.itemsSubject.next([]);
}
}
// cart.component.ts
items$ = this.cartStore.items$;
constructor(private cartStore: CartStore) {}
addToCart(product: Product) {
this.cartStore.add({ id: product.id, name: product.name, qty: 1 });
}
Bez dodatkowych bibliotek masz prosty, reaktywny stan koszyka, gotowy do podpięcia w dowolnym miejscu aplikacji.
Subject vs BehaviorSubject: kiedy który
Kilka praktycznych zasad:
- Subject – gdy interesują cię tylko przyszłe zdarzenia (np. kliknięcia, powiadomienia „fire and forget”).
- BehaviorSubject – gdy masz stan, który ma sens tylko z „ostatnią znaną wartością” (użytkownik, koszyk, ustawienia).
- ReplaySubject – gdy nowi subskrybenci muszą poznać kawałek historii (logi, konfiguracja, kilka ostatnich wiadomości czatu).
RxJS w Angularze (i nie tylko): jak to wygląda w realnej aplikacji
HttpClient i strumienie danych
HttpClient zwraca od razu Observable, więc możesz użyć pełnej palety operatorów.
// users.service.ts
import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { map, shareReplay } from 'rxjs';
@Injectable({ providedIn: 'root' })
export class UsersService {
users$ = this.http.get<any[]>('/api/users').pipe(
map(users => users.filter(u => u.active)),
shareReplay(1),
);
constructor(private http: HttpClient) {}
}
shareReplay(1) buforuje ostatnią wartość i współdzieli subskrypcję – realnie zapytanie leci raz, reszta komponentów dostaje wynik z cache.
Async pipe zamiast ręcznego subscribe
W komponentach najlepiej jak najrzadziej wywoływać .subscribe() ręcznie. async pipe obsłuży subskrypcję i odsubskrybowanie.
// users.component.ts
users$ = this.usersService.users$;
<ul>
<li *ngFor="let user of users$ | async">
{{ user.name }}
</li>
</ul>
Mniej kodu w ngOnDestroy, mniej ryzyka wycieków.
Reaktywne formularze i valueChanges
Formularze reaktywne udostępniają strumienie zmian przez valueChanges i statusChanges.
// search-form.component.ts
import { Component, OnInit } from '@angular/core';
import { FormControl } from '@angular/forms';
import { debounceTime, distinctUntilChanged } from 'rxjs';
@Component({
selector: 'app-search-form',
template: `
<input [formControl]="searchCtrl" placeholder="Szukaj" />
`
})
export class SearchFormComponent implements OnInit {
searchCtrl = new FormControl('');
ngOnInit() {
this.searchCtrl.valueChanges
.pipe(
debounceTime(300),
distinctUntilChanged(),
)
.subscribe(term => {
console.log('zapytanie', term);
});
}
}
W większych projektach możesz przerzucić tę logikę do serwisu i w komponencie trzymać tylko bindowanie formy.
Łączenie wielu strumieni: combineLatest, withLatestFrom, merge
Gdy masz kilka źródeł danych, zaczyna się prawdziwe „reaktywne życie”. Da się je skleić bez ręcznego żonglowania stanem.
combineLatest – zawsze aktualna kombinacja
combineLatest emituje, gdy którykolwiek ze strumieni się zmieni, ale dopiero po pierwszej emisji z każdego.
import { combineLatest, map } from 'rxjs';
const price$ = this.productService.price$; // number
const qty$ = this.cartService.quantity$; // number
const total$ = combineLatest([price$, qty$]).pipe(
map(([price, qty]) => price * qty),
);
Dobry wzorzec na „computed properties” – wynik zależny od kilku źródeł.
withLatestFrom – dołącz stan pomocniczy
withLatestFrom „dociąga” ostatnie wartości z innych strumieni, ale emituje tylko, gdy źródło bazowe nada.
import { fromEvent, withLatestFrom } from 'rxjs';
const clicks$ = fromEvent(document, 'click');
const user$ = this.authService.user$;
const trackedClicks$ = clicks$.pipe(
withLatestFrom(user$),
);
// [MouseEvent, User]
Dobre, gdy akcja użytkownika ma być połączona z aktualnym stanem (np. zalogowany użytkownik, aktualny filtr).
merge – kilka źródeł, jeden kanał
merge wrzuca wszystkie emisje z kilku strumieni do jednego, bez kombinowania wartości.
import { merge } from 'rxjs';
const errorsFromApi$ = this.api.errors$;
const errorsFromValidation$ = this.form.errors$;
const allErrors$ = merge(errorsFromApi$, errorsFromValidation$);
Przydatne przy logach, powiadomieniach, „globalnym” event busie.
Obsługa błędów: catchError, retry, onErrorResumeNext
Strumień bez strategii błędów w produkcji prędzej czy później wybuchnie.
catchError – lokalna reakcja na wyjątek
catchError przechwytuje błąd i pozwala zwrócić zamienny strumień.
import { catchError, of } from 'rxjs';
this.products$ = this.http.get<Product[]>('/api/products').pipe(
catchError(err => {
console.error('Błąd pobierania', err);
return of([]); // fallback
}),
);
Dzięki temu UI działa dalej, tylko z pustą listą.
retry i retryWhen – powtórz zapytanie
Drobne fluktuacje sieci łatwo ogarnąć retry.
import { retry } from 'rxjs';
this.data$ = this.http.get('/api/data').pipe(
retry(2), // spróbuj max 3 razy (1 + 2)
);
Przy bardziej skomplikowanych scenariuszach można użyć retryWhen i dorzucić np. delay albo eksp. backoff.
onErrorResumeNext – ignoruj i jedź dalej
Gdy pojedyncze źródła mogą padać, a reszta ma działać:
import { onErrorResumeNext } from 'rxjs';
const a$ = this.http.get('/api/a');
const b$ = this.http.get('/api/b');
const safe$ = onErrorResumeNext(a$, b$);
Każdy błąd kończy tylko aktualny strumień, kolejne idą dalej.
Reaktywna architektura komponentów: dane w górę, zdarzenia w dół
Dobrze poukładane komponenty łatwo się testuje i refaktoryzuje. RxJS pomaga trzymać porządek.
„Smart” vs „dumb” komponenty
Warstwa prezentacji nie powinna znać HttpClient ani routera. Może znać tylko wejściowe Observable i emitować zdarzenia.
// users-smart.component.ts
users$ = this.usersService.users$;
onReload() {
this.usersService.reload();
}
<app-users-list
[users]="users$ | async"
(reload)="onReload()"
></app-users-list>
// users-list.component.ts
@Input() users: User[] | null = [];
@Output() reload = new EventEmitter<void>();
Najprostszy podział: RxJS i logika w „smart”, czysty szablon w „dumb”.
Strumień zdarzeń komponentu zamiast wielu @Output
Zamiast kilku @Output można mieć jeden strumień „intencji”.
// filter-bar.component.ts
export type FilterIntent =
| { type: 'search'; term: string }
| { type: 'toggleActive'; active: boolean };
@Output() intent = new EventEmitter<FilterIntent>();
onSearch(term: string) {
this.intent.emit({ type: 'search', term });
}
onToggleActive(active: boolean) {
this.intent.emit({ type: 'toggleActive', active });
}
// container
this.filterBarIntent$
.pipe(
// tu możesz mapować intencje do zapytań, patchować store, itd.
)
.subscribe();
Rzutuje to potem na spójne, reaktywne przepływy danych w całej aplikacji.
Strumień routera i guardy
Nawigacja w Angularze to też dane w czasie.
Parametry trasy jako Observable
ActivatedRoute daje strumienie params, queryParams, data.
// user-details.component.ts
user$ = this.route.paramMap.pipe(
map(params => params.get('id')),
switchMap(id => this.usersService.getUser(id!)),
);
constructor(private route: ActivatedRoute,
private usersService: UsersService) {}
Zmiana parametru URL automatycznie przeładowuje dane. Bez ręcznego nasłuchiwania.
Reaktywne guardy
Guard może zwrócić Observable<boolean>, co pozwala na złożone scenariusze autoryzacji.
// auth.guard.ts
canActivate(): Observable<boolean> {
return this.auth.user$.pipe(
map(user => !!user),
);
}
Z czasem można dorzucić tap do przekierowań, logowania itp.
Projekt weekendowy: mini-apka – założenia
Dla treningu przyda się prosta, ale kompletna aplikacja. Bez paneli admina i komplikacji.
Cel: „Lista zadań + filtry + statystyki”
Zakres:
- lista zadań z możliwością dodania, odznaczenia, usunięcia,
- filtry: wszystkie / aktywne / zakończone,
- proste statystyki: ile zadań, ile ukończonych,
- persist w
localStoragelub prostym API.
Na tym poligonie przećwiczysz większość operatorów i Subjectów z wcześniejszych sekcji.
Struktura projektu i moduły
Podział na kilka prostych warstw wystarcza:
TodoStore– logika stanu zadań (BehaviorSubject),TodoApiService– zapis/odczyt (API lub localStorage),TodoPageComponent– kontener, który skleja wszystko,- komponenty prezentacyjne: lista, pojedynczy item, panel filtrów.
Model danych
export interface Todo {
id: string;
title: string;
completed: boolean;
createdAt: string;
}
Store oparty na BehaviorSubject i operatorach
Store trzyma stan, emituje zmiany i wystawia gotowe selektory.
// todo.store.ts
import { Injectable } from '@angular/core';
import { BehaviorSubject, map } from 'rxjs';
import { Todo } from './todo.model';
export type TodoFilter = 'all' | 'active' | 'completed';
@Injectable({ providedIn: 'root' })
export class TodoStore {
private todosSubject = new BehaviorSubject<Todo[]>([]);
private filterSubject = new BehaviorSubject<TodoFilter>('all');
todos$ = this.todosSubject.asObservable();
filter$ = this.filterSubject.asObservable();
visibleTodos$ = this.todos$.pipe(
combineLatestWith(this.filter$),
map(([todos, filter]) => {
switch (filter) {
case 'active': return todos.filter(t => !t.completed);
case 'completed': return todos.filter(t => t.completed);
default: return todos;
}
}),
);
stats$ = this.todos$.pipe(
map(todos => {
const completed = todos.filter(t => t.completed).length;
return {
total: todos.length,
completed,
active: todos.length - completed,
};
}),
);
setTodos(todos: Todo[]) {
this.todosSubject.next(todos);
}
add(title: string) {
const todo: Todo = {
id: crypto.randomUUID(),
title,
completed: false,
createdAt: new Date().toISOString(),
};
this.todosSubject.next([...this.todosSubject.value, todo]);
}
toggle(id: string) {
this.todosSubject.next(
this.todosSubject.value.map(t =>
t.id === id ? { ...t, completed: !t.completed } : t,
),
);
}
remove(id: string) {
this.todosSubject.next(
this.todosSubject.value.filter(t => t.id !== id),
);
}
setFilter(filter: TodoFilter) {
this.filterSubject.next(filter);
}
}
Cały „stan” w jednym miejscu, komponenty dostają już przetworzone strumienie.
Persist w localStorage z użyciem efektu
Prosty „efekt” można zapisać jako subskrypcję w serwisie.
// todo-persistence.service.ts
import { Injectable } from '@angular/core';
import { TodoStore } from './todo.store';
import { filter, tap } from 'rxjs';
const STORAGE_KEY = 'todos';
@Injectable({ providedIn: 'root' })
export class TodoPersistenceService {
constructor(private store: TodoStore) {
const raw = localStorage.getItem(STORAGE_KEY);
if (raw) {
this.store.setTodos(JSON.parse(raw));
}
this.store.todos$
.pipe(
tap(todos => {
localStorage.setItem(STORAGE_KEY, JSON.stringify(todos));
}),
)
.subscribe();
}
}
Stan synchronizuje się z localStorage sam, bez kodu w komponentach.
Kontener strony: klejenie strumieni
Kontener zbiera Observable ze store’a i przekazuje do prezentacji.
// todo-page.component.ts
import { Component } from '@angular/core';
import { TodoStore, TodoFilter } from './todo.store';
@Component({
selector: 'app-todo-page',
templateUrl: './todo-page.component.html',
providers: [TodoStore], // opcjonalnie per-strona
})
export class TodoPageComponent {
todos$ = this.store.visibleTodos$;
stats$ = this.store.stats$;
filter$ = this.store.filter$;
constructor(private store: TodoStore) {}
add(title: string) {
this.store.add(title);
}
toggle(id: string) {
this.store.toggle(id);
}
remove(id: string) {
this.store.remove(id);
}
setFilter(filter: TodoFilter) {
this.store.setFilter(filter);
}
}
<!-- todo-page.component.html -->
<app-todo-input (add)="add($event)"></app-todo-input>
<app-todo-filters
[activeFilter]="filter$ | async"
(changeFilter)="setFilter($event)"
></app-todo-filters>
<app-todo-stats [stats]="stats$ | async"></app-todo-stats>
<app-todo-list
[todos]="todos$ | async"
(toggle)="toggle($event)"
(remove)="remove($event)"
></app-todo-list>
W szablonie tylko | async i przekazywanie zdarzeń w górę.
Komponent wejściowy sterowany Subjectem
Można dodać drobną logikę (np. debounce) bezpośrednio w komponencie z użyciem Subject.
// todo-input.component.ts
import { Component, OnDestroy } from '@angular/core';
import { Subject, debounceTime, filter, map, takeUntil } from 'rxjs';
@Component({
selector: 'app-todo-input',
template: `
<input
type="text"
[value]="value"
(input)="onInput($event)"
(keyup.enter)="onEnter()"
placeholder="Dodaj zadanie"
/>
`
})
export class TodoInputComponent implements OnDestroy {
@Output() add = new EventEmitter<string>();
private inputSubject = new Subject<string>();
private destroy$ = new Subject<void>();
value = '';
constructor() {
this.inputSubject.pipe(
debounceTime(200),
map(v => v.trim()),
takeUntil(this.destroy$),
).subscribe(v => {
this.value = v;
});
}
onInput(event: Event) {
const target = event.target as HTMLInputElement;
this.inputSubject.next(target.value);
}
onEnter() {
const title = this.value.trim();
if (!title) return;
this.add.emit(title);
this.value = '';
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}
Mechanizm Subject + takeUntil trzyma porządek w subskrypcjach także w małych komponentach.
Mini analytics: liczenie zdarzeń z merge i scan
Do apki można dorzucić prosty moduł „telemetrii” oparty o RxJS.
// analytics.service.ts
import { Injectable } from '@angular/core';
import { Subject, merge, scan, map } from 'rxjs';
type EventType = 'todo_add' | 'todo_toggle' | 'todo_remove';
@Injectable({ providedIn: 'root' })
export class AnalyticsService {
private eventsSubject = new Subject<EventType>();
events$ = this.eventsSubject.asObservable();
counters$ = this.events$.pipe(
scan((acc, type) => {
acc[type] = (acc[type] ?? 0) + 1;
return { ...acc };
}, {} as Record<EventType, number>),
);
track(type: EventType) {
this.eventsSubject.next(type);
}
}
// w TodoPageComponent
constructor(private store: TodoStore,
private analytics: AnalyticsService) {}
add(title: string) {
this.store.add(title);
this.analytics.track('todo_add');
}
toggle(id: string) {
this.store.toggle(id);
this.analytics.
Najczęściej zadawane pytania (FAQ)
Czy da się nauczyć RxJS w jeden weekend od zera?
W 2–3 dni da się ogarnąć solidne podstawy: czym jest Observable, jak działają kluczowe operatory i jak zarządzać subskrypcjami. To już wystarcza, żeby zacząć sensownie używać RxJS w interfejsie i przy pracy z API.
Nie opanujesz całego ekosystemu ani wszystkich operatorów. Na start wystarczy kilkanaście najważniejszych i kilka prostych wzorców – resztę dociągniesz w realnych projektach.
Od czego zacząć naukę RxJS jako frontend developer?
Na początku skup się na minimalnym zestawie: Observable, Observer, Subscription, podstawowe operatory (map, filter, tap, switchMap, mergeMap, debounceTime, distinctUntilChanged, catchError) oraz Subject i BehaviorSubject. To jest trzon, który przewija się w większości zastosowań.
Dobrą bazą ćwiczeń są proste przykłady z UI: input z wyszukiwaniem, przycisk ładujący dane z API, autosave formularza po chwili bezczynności. Dzięki temu szybko zobaczysz, jak RxJS porządkuje asynchroniczny bałagan.
Po co mi RxJS, skoro mam Promisy i async/await?
Promisy i async/await dobrze ogarniają pojedyncze operacje asynchroniczne, ale gorzej radzą sobie ze strumieniami zdarzeń w czasie (input, scroll, WebSocket, wiele żądań HTTP naraz). Tam zaczyna się callback hell, promise spaghetti i ręczne zarządzanie eventami.
RxJS pozwala traktować wszystkie te źródła jako strumienie, które można filtrować, łączyć i transformować operatorami. W praktyce logika staje się bardziej przewidywalna, krótsza i łatwiejsza do testowania, szczególnie gdy zdarzeń jest dużo.
Jakie są najczęstsze zastosowania RxJS w aplikacjach webowych?
Najczęściej RxJS pojawia się tam, gdzie dane zmieniają się w czasie i reagują na użytkownika: obsługa kliknięć, wpisywanie w pola formularzy, scroll, ruch myszką. Klasyk to pole wyszukiwania z debounce i dynamiczne filtrowanie list.
Drugi mocny obszar to praca z API: anulowanie starych zapytań, kolejkowanie, retry przy błędach, polling, WebSockety. Dochodzi jeszcze autosave (np. draft posta po chwili bezczynności) i dynamiczne filtrowanie/sortowanie tabel po wielu kryteriach naraz.
Czym różni się Subject od BehaviorSubject w RxJS?
Subject emituje wartości tylko „w przód” – nowy subskrybent dostaje emisje od momentu subskrypcji. BehaviorSubject zawsze przechowuje ostatnią wartość i natychmiast wysyła ją nowemu subskrybentowi.
W praktyce BehaviorSubject lepiej sprawdza się jako nośnik aktualnego stanu (np. zalogowany użytkownik, zawartość koszyka), a „zwykły” Subject jako kanał zdarzeń (np. kliknięcia, akcje użytkownika).
Jak unikać wycieków pamięci przy subskrypcjach RxJS?
Podstawowa zasada: subskrypcje trzeba sprzątać, gdy przestają być potrzebne. W Angularze najczęściej używa się async pipe albo unsubscribe w ngOnDestroy (czasem z pomocą Subjecta typu destroy$ i operatora takeUntil).
W czystym RxJS pomaga trzymanie się wzorca: jedna zmienna z Subscription lub add() na composite subscription, a przy zamykaniu komponentu/klasy wywołanie unsubscribe(). Warto też korzystać z operatorów, które same zakańczają strumień (take, takeUntil, first).
Co to znaczy, że Observable jest „hot” albo „cold”?
Cold observable zaczyna emitować wartości dopiero w momencie subskrypcji i dla każdego subskrybenta „odpala się” osobno (np. from(fetch(...)), of, from tablicy). Każdy obserwator widzi pełną sekwencję od początku.
Hot observable emituje niezależnie od subskrybentów – nowi dołączają „w trakcie filmu” (np. fromEvent, Subject, BehaviorSubject). To ma znaczenie przy projektowaniu strumieni stanu i przy współdzieleniu jednego źródła danych między wielu odbiorców.






