RxJS dla niecierpliwych: jak ogarnąć programowanie reaktywne w weekend

0
2
Rate this post

Nawigacja:

Co da ci RxJS w jeden weekend

Realistyczny zakres: ile da się opanować w 2–3 dni

Weekend wystarczy, żeby zbudować solidny fundament RxJS: rozumieć, czym jest Observable, jak działają podstawowe operatory i jak kontrolować cykl życia subskrypcji. To już pozwala pisać sensowne, reaktywne fragmenty kodu w interfejsie i przy pracy z API.

W tak krótkim czasie nie opanujesz wszystkich operatorów, złożonych wzorców architektonicznych ani całego ekosystemu. Nie musisz. W praktyce większość frontendowych zadań da się ogarnąć kilkunastoma operatorami i kilkoma prostymi wzorcami. Reszta to detale, które dojdziesz w projektach.

Po jednym intensywnym weekendzie powinieneś:

  • swobodnie tworzyć observables z eventów, tablic i HTTP,
  • używać kluczowych operatorów: map, filter, tap, switchMap, mergeMap, debounceTime, distinctUntilChanged, catchError,
  • rozumieć różnicę między Subject a BehaviorSubject,
  • opanować podstawy zarządzania subskrypcjami i unikania wycieków pamięci,
  • spiąć to w jedną, prostą mini-aplikację reaktywną.

Na później zostają tematy typu: schedulery, zaawansowane łączenie wielu strumieni, wzorce jak CQRS/Redux oparte na RxJS czy optymalizacje pod duże aplikacje.

Jak wyglądają aplikacje bez RxJS

W klasycznym kodzie JS/TS interakcje asynchroniczne są klejone ręcznie: callbacki w callbackach, gąszcz .then(), setTimeouty, event listenery rozsiane po plikach. Z czasem utrzymanie takiego kodu staje się koszmarem.

Typowe problemy bez programowania reaktywnego:

  • Callback hell – zagnieżdżone funkcje, które trudno czytać i testować.
  • Promise spaghetti – łańcuchy then/catch/finally, w które trudno wstrzyknąć dodatkową logikę (np. retry, timeout, cancel).
  • Ręczne zarządzanie eventami – każdy event listener gdzieś indziej, brak centralnej logiki łączenia i filtrowania zdarzeń.
  • Duplikacja kodu – to samo podejście do obsługi błędów, loadera, ponawiania zapytań kopiowane w wielu miejscach.

RxJS pozwala traktować to wszystko jak jeden mechanizm: strumienie zdarzeń przekształcasz operatorami, a efektem jest prostszy, bardziej przewidywalny kod.

Typowe zastosowania RxJS w codziennej pracy

RxJS wchodzi do gry wszędzie tam, gdzie dane się zmieniają w czasie lub reagują na zdarzenia użytkownika.

  • UI – reagowanie na kliknięcia, ruch myszką, przewijanie (scroll), zmiany w polach formularzy.
  • API – kolejki zapytań HTTP, anulowanie starych zapytań, debouncing wyszukiwania, retry przy błędach.
  • Autosave – zapisywanie wersji roboczej po chwili bezczynności, bez zasypywania serwera żądaniami.
  • Filtrowanie i sortowanie – dynamiczne listy i tabele reagujące na kombinację wielu filtrów.
  • WebSocket / SSE – strumień danych „na żywo”, np. notyfikacje, czaty, wykresy.

Jeśli tworzysz front w Angularze, RxJS spotykasz niemal wszędzie: HttpClient, formularze reaktywne, Router, async pipe w szablonach.

Minimalny zestaw pojęć na start

Żeby nie utknąć na podstawach, w weekend ogarnij te elementy:

  • Observable – strumień emitujący wartości w czasie.
  • Observer – obiekt/funkcja, która odbiera wartości (next, error, complete).
  • Subscription – uchwyt do aktywnej subskrypcji, pozwala ją przerwać.
  • Operatory – funkcje przekształcające strumienie (np. map, filter, switchMap).
  • Subject/BehaviorSubject – aktywne źródła danych, do których sam możesz „wypychać” wartości.
  • Hot vs cold – różnica między strumieniem, który zaczyna emitować dopiero przy subskrypcji, a takim, który emituje niezależnie od subskrybentów.

Ten pakiet wystarcza, by swobodnie poruszać się w prostych i średnio złożonych scenariuszach RxJS w aplikacjach webowych.

O co chodzi w programowaniu reaktywnym (bez filozofii)

Wartości w czasie zamiast pojedynczej wartości

Klasyczna zmienna to fotografia: jedna wartość w danej chwili. Strumień to film: sekwencja wartości w czasie. Programowanie reaktywne skupia się na pracy z tym filmem zamiast z pojedynczym kadrem.

W RxJS Observable to właśnie taka „wartość w czasie”. Nie mówisz „daj mi wartość teraz”, tylko „powiedz mi o każdej nowej wartości, która się pojawi”. Ustalasz, jak reagujesz, gdy ta wartość się zmienia.

Dla przykładu: wpisywana fraza wyszukiwania to nie jedna wartość, tylko strumień kolejnych tekstów po każdym naciśnięciu klawisza. RxJS pozwala ten strumień:

  • przefiltrować (np. ignorować długość < 3),
  • odszumić (debounce),
  • przekształcić w zapytania HTTP,
  • złączyć wyniki z innymi strumieniami stanu (loader, błędy).

Imperatywny kod vs podejście reaktywne

Imperatywnie piszesz „krok po kroku”: pobierz, poczekaj, zrób coś, wyświetl. Kod jest sekwencją instrukcji. Gdy pojawia się wiele źródeł zdarzeń (user, API, WebSocket), logika zaczyna się przeplatać i mieszać.

Reaktywnie opisujesz zależności: dane A zależą od B i C, a UI reaguje na zmiany A. Reakcje są deklaratywne – opisujesz co ma się dziać, gdy napływają kolejne wartości.

Zamiast:

button.addEventListener('click', () => {
  fetchData().then(result => {
    render(result);
  }).catch(showError);
});

budujesz strumień kliknięć i przepuszczasz go przez operatory, łącząc z HTTP i UI. Efekt jest często krótszy, czytelniejszy i łatwiejszy do testowania.

Excel jako intuicyjna analogia

Excel jest dobrym obrazem programowania reaktywnego. Komórka z formułą =A1+B1 „słucha” zmian w A1 i B1. Gdy zmienisz A1, arkusz automatycznie przelicza wynik, bez twojej ręcznej ingerencji.

W kodzie reaktywnym tworzysz „komórki” zależne od innych strumieni: np. total$ zależne od items$ i discount$. Gdy którykolwiek z nich się zmieni, nowa wartość totalu pojawi się automatycznie.

To dokładnie to samo: deklarujesz zależności, a system dba o propagację zmian. RxJS jest takim „silnikiem Excela” dla twojej aplikacji.

Skąd biorą się strumienie w aplikacji frontowej

Najczęstsze źródła strumieni w przeglądarce:

  • Zdarzenia DOM – kliknięcia, input, scroll, keyup, mousemove.
  • HTTP – odpowiedzi z API, ponawiane żądania, polling.
  • WebSocket / SSE – wiadomości przychodzące z serwera w czasie rzeczywistym.
  • Czas – interwały, timeouty, zegary, animacje.
  • Wewnętrzny stan aplikacji – np. aktualny użytkownik, koszyk, uprawnienia.

RxJS pozwala zamienić każdy z tych obszarów na Observable, a potem spokojnie je składać, filtrować i transformować.

Zbliżenie ekranu z kodem HTML i JavaScript w edytorze Visual Studio Code
Źródło: Pexels | Autor: Antonio Batinić

Rdzeń RxJS w praktyce: Observable, Observer, Subscription

Observable: źródło, które pcha dane

Observable to obiekt reprezentujący strumień danych. Emisja odbywa się przez trzy kanały:

  • next(value) – nowa wartość,
  • error(err) – błąd kończący strumień,
  • complete() – normalne zakończenie strumienia.

Tworzenie własnego observable:

import { Observable } from 'rxjs';

const my$ = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.complete();
});

W praktyce częściej sięgasz po gotowe „fabryki” observabli zamiast pisać je od zera.

Fabryki observable: of, from, fromEvent, interval

Najczęściej używane funkcje tworzące strumienie:

  • of(…values) – emituje podane wartości i natychmiast się kończy:
import { of } from 'rxjs';

const numbers$ = of(1, 2, 3); // 1, 2, 3, complete
  • from(iterable/promise) – zamienia tablicę, mapę, Promise na observable:
import { from } from 'rxjs';

const arr$ = from([1, 2, 3]);  // 1, 2, 3, complete
const promise$ = from(fetch('/api/data')); // odpowiedź jako wartość
  • fromEvent(target, eventName) – strumień zdarzeń DOM lub innych emitterów:
import { fromEvent } from 'rxjs';

const clicks$ = fromEvent(button, 'click');
  • interval(ms) – nieskończony strumień liczb (0, 1, 2, …) co zadany interwał:
import { interval } from 'rxjs';

const tick$ = interval(1000); // 0,1,2,... co sekundę

Observer: jak odbierać next, error i complete

Subskrypcja to przekazanie observera do observable. Observer to obiekt z trzema funkcjami: next, error, complete. Można też podać tylko funkcję next.

const subscription = numbers$.subscribe({
  next: value => console.log('value', value),
  error: err => console.error('error', err),
  complete: () => console.log('done'),
});

Skrót:

numbers$.subscribe(value => console.log(value)); // tylko next

W Angularze observerem często jest logika w serwisie lub komponencie: aktualizacja stanu, zmiana pól komponentu, wywołanie metod serwisu.

Subscription i cykl życia strumienia

Metoda .subscribe() zwraca obiekt Subscription. To uchwyt, dzięki któremu możesz zakończyć subskrypcję:

const sub = interval(1000).subscribe(v => console.log(v));

setTimeout(() => {
  sub.unsubscribe(); // zatrzymuje interval
}, 5000);

Niekończące się strumienie (interval, fromEvent) wymagają świadomego domykania. W przeciwnym razie zostaną w pamięci razem z powiązanymi obiektami (wyciek pamięci).

W Angularze najczęstsze sposoby zarządzania subskrypcjami:

  • async pipe w template (zamykany automatycznie),
  • takeUntil(destroy$) w komponentach,
  • ręczne ngOnDestroy z subscription.unsubscribe().

Przykład: kliknięcia przycisku z odsubskrybowaniem

Prosty scenariusz: loguj kliknięcia przycisku, ale tylko przez 10 sekund.

import { fromEvent, timer, takeUntil } from 'rxjs';

const button = document.querySelector('#save');
const clicks$ = fromEvent(button, 'click');
const stop$ = timer(10000); // po 10 s wyemituje 0 i się zakończy

const sub = clicks$
  .pipe(takeUntil(stop$))
  .subscribe(() => console.log('klik'));

// sub.unsubscribe() niepotrzebne – takeUntil zakończy strumień

Tutaj takeUntil pilnuje cyklu życia subskrypcji za nas, co jest wygodne też w komponentach.

Pierwszy dzień: operatory, bez których nie ma sensu ruszać dalej

Operatory tworzące i transformujące: map, filter, tap

Operatory transformujące to serce RxJS. Za ich pomocą opisujesz, co ma się dziać z kolejnymi wartościami w strumieniu.

map – przekształcenie wartości

map zmienia każdą wartość w inną, zachowując kolejność i liczbę emisji.

import { of, map } from 'rxjs';

of(1, 2, 3)
  .pipe(map(x => x * 10))
  .subscribe(console.log); // 10, 20, 30

Zastosowania:

  • wyciąganie pola z obiektu (map(user => user.name)),
  • formatowanie daty/tekstu,
  • filter – odrzucanie niepotrzebnych emisji

    filter przepuszcza tylko te wartości, które spełniają warunek. Reszta znika ze strumienia.

    import { of, filter } from 'rxjs';
    
    of(1, 2, 3, 4, 5)
      .pipe(filter(x => x % 2 === 0))
      .subscribe(console.log); // 2, 4
    

    Typowe użycia:

  • walidacja danych wejściowych (np. długość tekstu),
  • ignorowanie zdarzeń, które cię nie interesują (np. klawisze inne niż Enter),
  • przepuszczanie tylko „pełnych” obiektów po walidacji schematu.

tap – efekt uboczny bez zmiany strumienia

tap pozwala podglądać wartości lub odpalać efekty uboczne bez modyfikowania strumienia.

import { of, map, tap } from 'rxjs';

of(1, 2, 3)
  .pipe(
    tap(v => console.log('przed map', v)),
    map(v => v * 2),
    tap(v => console.log('po map', v)),
  )
  .subscribe();

Przydaje się do logowania, prostych metryk, wywołań typu „fire-and-forget” (np. powiadomienie o zdarzeniu do zewnętrznego trackera), ale bez dotykania kształtu danych.

Operatory czasu: debounceTime, throttleTime, distinctUntilChanged

Praca z UI niemal zawsze wymaga kontroli częstotliwości zdarzeń. Kilka operatorów załatwia większość takich przypadków.

debounceTime – poczekaj, aż użytkownik skończy pisać

debounceTime(ms) czeka, aż przez dany czas nie pojawi się nowa wartość, i dopiero wtedy ją przepuszcza.

import { fromEvent, map, debounceTime } from 'rxjs';

const searchInput = document.querySelector('#search');

const search$ = fromEvent(searchInput, 'input').pipe(
  map((e: any) => e.target.value),
  debounceTime(300),
);

search$.subscribe(term => {
  console.log('szukaj:', term);
});

Używane do „odszumiania” inputów, scrolla, resize okna – wszędzie tam, gdzie nie chcesz reagować na każdą pojedynczą zmianę.

throttleTime – reaguj co jakiś czas, resztę ignoruj

throttleTime(ms) przepuszcza pierwszą wartość, a kolejne przez zadany czas ignoruje.

import { fromEvent, throttleTime } from 'rxjs';

const btn = document.querySelector('#like');

fromEvent(btn, 'click')
  .pipe(throttleTime(2000))
  .subscribe(() => console.log('klik zaakceptowany'));

Dobre do zabezpieczania przycisków przed spamem, reagowania na scroll co X ms, lekkiej ochrony endpointów.

distinctUntilChanged – reaguj tylko na faktyczną zmianę

distinctUntilChanged() porównuje kolejne wartości i przepuszcza tylko te, które są inne niż poprzednia.

import { of, distinctUntilChanged } from 'rxjs';

of(1, 1, 2, 2, 2, 3)
  .pipe(distinctUntilChanged())
  .subscribe(console.log); // 1, 2, 3

W połączeniu z inputami zmniejsza liczbę zapytań i renderów. Przykład: filtruj wyszukiwanie tylko gdy tekst naprawdę się zmienił, a nie gdy user wpisze tę samą frazę.

Łączenie strumieni: mergeMap, switchMap, concatMap

Najwięcej zamieszania robią operatory, które z jednej wartości robią nowy strumień (np. z kliknięcia – request HTTP). To tzw. operatory „flattening”.

switchMap – idealny do wyszukiwania i autouzupełniania

switchMap bierze każdą wartość, zamienia ją na nowy observable i subskrybuje, a poprzedni porzuca.

import { fromEvent, map, debounceTime, distinctUntilChanged, switchMap } from 'rxjs';
import { ajax } from 'rxjs/ajax';

const input = document.querySelector('#search');

fromEvent(input, 'input')
  .pipe(
    map((e: any) => e.target.value),
    debounceTime(300),
    distinctUntilChanged(),
    switchMap(term =>
      ajax.getJSON(`/api/search?q=${encodeURIComponent(term)}`)
    ),
  )
  .subscribe(results => {
    console.log('wyniki', results);
  });

Przy nowym zapytaniu poprzednie żądanie jest logicznie ignorowane. Efekt: zero wyścigów odpowiedzi, pokazujesz tylko najświeższe dane.

mergeMap – równolegle, bez odwoływania poprzednich

mergeMap (dawniej flatMap) nie odsubskrybowuje poprzednich wewnętrznych obserwabli. Każde wejście uruchamia nowy strumień, wszystkie lecą równolegle.

import { fromEvent, mergeMap } from 'rxjs';
import { ajax } from 'rxjs/ajax';

fromEvent(document, 'click')
  .pipe(
    mergeMap(() => ajax.getJSON('/api/track-click')),
  )
  .subscribe();

Dobre do akcji, które mogą się wykonywać równolegle: logowanie zdarzeń, pobieranie niezależnych danych, upload wielu plików.

concatMap – po kolei, bez nakładania się

concatMap ustawia zlecenia w kolejce. Następne rusza dopiero po zakończeniu poprzedniego.

import { fromEvent, concatMap } from 'rxjs';
import { ajax } from 'rxjs/ajax';

const saveBtn = document.querySelector('#save');

fromEvent(saveBtn, 'click')
  .pipe(
    concatMap(() => ajax.post('/api/save', { payload: '...' })),
  )
  .subscribe();

Sprawdza się przy operacjach, które muszą być sekwencyjne: zapisy do API, migracje, batchowe akcje administracyjne.

Kontrola zakończenia strumienia: take, takeUntil, takeWhile

Z tym zestawem można ogarnąć większość cykli życia bez ręcznego unsubscribe.

take – tylko pierwsze N wartości

take(n) przepuszcza N wartości, po czym kończy strumień.

import { interval, take } from 'rxjs';

interval(1000)
  .pipe(take(3))
  .subscribe(console.log); // 0,1,2 i complete

takeUntil – zakończ, gdy inny strumień coś wyemituje

takeUntil(notifier$) trzyma subskrypcję, dopóki notifier$ nie nada pierwszej wartości.

import { fromEvent, interval, takeUntil } from 'rxjs';

const stop$ = fromEvent(document, 'mouseup');

interval(500)
  .pipe(takeUntil(stop$))
  .subscribe(console.log);

Do komponentów w Angularze zwykle używa się destroy$ wywoływanego w ngOnDestroy.

takeWhile – dopóki warunek jest spełniony

takeWhile(predicate) przepuszcza wartości tak długo, jak warunek zwraca true.

import { interval, takeWhile } from 'rxjs';

interval(1000)
  .pipe(takeWhile(v => v < 5))
  .subscribe(console.log); // 0..4
Ekran komputera z kodem JavaScript, odbicia na ciemnym monitorze
Źródło: Pexels | Autor: Daniil Komov

Drugi dzień: Subject, BehaviorSubject i spółka – komunikacja wewnątrz aplikacji

Subject jako most między światem „push” i „pull”

Subject jest jednocześnie Observable i Observerem. Możesz na nim wywołać next(), a inni mogą się na niego subskrybować.

import { Subject } from 'rxjs';

const events$ = new Subject<string>();

events$.subscribe(v => console.log('A:', v));
events$.subscribe(v => console.log('B:', v));

events$.next('hello'); // A: hello, B: hello

Dobrze się sprawdza jako wewnętrzna „szyna zdarzeń” w serwisie lub module.

BehaviorSubject – aktualny stan + wartości przyszłe

BehaviorSubject przechowuje ostatnią wartość i natychmiast wysyła ją nowym subskrybentom.

import { BehaviorSubject } from 'rxjs';

const counter$ = new BehaviorSubject(0);

counter$.subscribe(v => console.log('sub1', v)); // sub1 0

counter$.next(1);
counter$.next(2);

counter$.subscribe(v => console.log('sub2', v)); // sub2 2

W praktyce używaj go jak prostego store’a: przechowuje aktualny stan, a komponenty obserwują zmiany.

ReplaySubject – odtwórz historię nowym subskrybentom

ReplaySubject przechowuje bufor ostatnich N emisji (lub wszystkie w wersji bez limitu) i odtwarza je nowym subskrybentom.

import { ReplaySubject } from 'rxjs';

const log$ = new ReplaySubject<string>(2); // ostatnie 2

log$.next('a');
log$.next('b');
log$.next('c');

log$.subscribe(v => console.log('sub', v)); // b, c

Przydatne, gdy później dołącza się komponent, który musi „nadrobić” kluczowe dane z przeszłości.

Subject jako bus zdarzeń między komponentami

Prosty serwis do komunikacji: jeden komponent nadaje, drugi słucha.

// notification.service.ts
import { Injectable } from '@angular/core';
import { Subject } from 'rxjs';

@Injectable({ providedIn: 'root' })
export class NotificationService {
  private messagesSubject = new Subject<string>();
  messages$ = this.messagesSubject.asObservable();

  push(message: string) {
    this.messagesSubject.next(message);
  }
}
// sender.component.ts
constructor(private notification: NotificationService) {}

save() {
  // ... zapis
  this.notification.push('Zapisano dane');
}
// viewer.component.ts
messages$ = this.notification.messages$;

W template viewer używasz | async, żeby wyświetlać kolejne komunikaty.

Prosty store na BehaviorSubject

Dla małych aplikacji wystarczy serwis oparty o BehaviorSubject.

// cart.store.ts
import { Injectable } from '@angular/core';
import { BehaviorSubject } from 'rxjs';

export interface CartItem {
  id: string;
  name: string;
  qty: number;
}

@Injectable({ providedIn: 'root' })
export class CartStore {
  private itemsSubject = new BehaviorSubject<CartItem[]>([]);
  items$ = this.itemsSubject.asObservable();

  add(item: CartItem) {
    const items = this.itemsSubject.value;
    this.itemsSubject.next([...items, item]);
  }

  clear() {
    this.itemsSubject.next([]);
  }
}
// cart.component.ts
items$ = this.cartStore.items$;

constructor(private cartStore: CartStore) {}

addToCart(product: Product) {
  this.cartStore.add({ id: product.id, name: product.name, qty: 1 });
}

Bez dodatkowych bibliotek masz prosty, reaktywny stan koszyka, gotowy do podpięcia w dowolnym miejscu aplikacji.

Subject vs BehaviorSubject: kiedy który

Kilka praktycznych zasad:

  • Subject – gdy interesują cię tylko przyszłe zdarzenia (np. kliknięcia, powiadomienia „fire and forget”).
  • BehaviorSubject – gdy masz stan, który ma sens tylko z „ostatnią znaną wartością” (użytkownik, koszyk, ustawienia).
  • ReplaySubject – gdy nowi subskrybenci muszą poznać kawałek historii (logi, konfiguracja, kilka ostatnich wiadomości czatu).

RxJS w Angularze (i nie tylko): jak to wygląda w realnej aplikacji

HttpClient i strumienie danych

HttpClient zwraca od razu Observable, więc możesz użyć pełnej palety operatorów.

// users.service.ts
import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { map, shareReplay } from 'rxjs';

@Injectable({ providedIn: 'root' })
export class UsersService {
  users$ = this.http.get<any[]>('/api/users').pipe(
    map(users => users.filter(u => u.active)),
    shareReplay(1),
  );

  constructor(private http: HttpClient) {}
}

shareReplay(1) buforuje ostatnią wartość i współdzieli subskrypcję – realnie zapytanie leci raz, reszta komponentów dostaje wynik z cache.

Async pipe zamiast ręcznego subscribe

W komponentach najlepiej jak najrzadziej wywoływać .subscribe() ręcznie. async pipe obsłuży subskrypcję i odsubskrybowanie.

// users.component.ts
users$ = this.usersService.users$;
<ul>
  <li *ngFor="let user of users$ | async">
    {{ user.name }}
  </li>
</ul>

Mniej kodu w ngOnDestroy, mniej ryzyka wycieków.

Reaktywne formularze i valueChanges

Formularze reaktywne udostępniają strumienie zmian przez valueChanges i statusChanges.

// search-form.component.ts
import { Component, OnInit } from '@angular/core';
import { FormControl } from '@angular/forms';
import { debounceTime, distinctUntilChanged } from 'rxjs';

@Component({
  selector: 'app-search-form',
  template: `
    <input [formControl]="searchCtrl" placeholder="Szukaj" />
  `
})
export class SearchFormComponent implements OnInit {
  searchCtrl = new FormControl('');

  ngOnInit() {
    this.searchCtrl.valueChanges
      .pipe(
        debounceTime(300),
        distinctUntilChanged(),
      )
      .subscribe(term => {
        console.log('zapytanie', term);
      });
  }
}

W większych projektach możesz przerzucić tę logikę do serwisu i w komponencie trzymać tylko bindowanie formy.

Łączenie wielu strumieni: combineLatest, withLatestFrom, merge

Gdy masz kilka źródeł danych, zaczyna się prawdziwe „reaktywne życie”. Da się je skleić bez ręcznego żonglowania stanem.

combineLatest – zawsze aktualna kombinacja

combineLatest emituje, gdy którykolwiek ze strumieni się zmieni, ale dopiero po pierwszej emisji z każdego.

import { combineLatest, map } from 'rxjs';

const price$ = this.productService.price$;      // number
const qty$   = this.cartService.quantity$;     // number

const total$ = combineLatest([price$, qty$]).pipe(
  map(([price, qty]) => price * qty),
);

Dobry wzorzec na „computed properties” – wynik zależny od kilku źródeł.

withLatestFrom – dołącz stan pomocniczy

withLatestFrom „dociąga” ostatnie wartości z innych strumieni, ale emituje tylko, gdy źródło bazowe nada.

import { fromEvent, withLatestFrom } from 'rxjs';

const clicks$ = fromEvent(document, 'click');
const user$   = this.authService.user$;

const trackedClicks$ = clicks$.pipe(
  withLatestFrom(user$),
);
// [MouseEvent, User]

Dobre, gdy akcja użytkownika ma być połączona z aktualnym stanem (np. zalogowany użytkownik, aktualny filtr).

merge – kilka źródeł, jeden kanał

merge wrzuca wszystkie emisje z kilku strumieni do jednego, bez kombinowania wartości.

import { merge } from 'rxjs';

const errorsFromApi$ = this.api.errors$;
const errorsFromValidation$ = this.form.errors$;

const allErrors$ = merge(errorsFromApi$, errorsFromValidation$);

Przydatne przy logach, powiadomieniach, „globalnym” event busie.

Obsługa błędów: catchError, retry, onErrorResumeNext

Strumień bez strategii błędów w produkcji prędzej czy później wybuchnie.

catchError – lokalna reakcja na wyjątek

catchError przechwytuje błąd i pozwala zwrócić zamienny strumień.

import { catchError, of } from 'rxjs';

this.products$ = this.http.get<Product[]>('/api/products').pipe(
  catchError(err => {
    console.error('Błąd pobierania', err);
    return of([]); // fallback
  }),
);

Dzięki temu UI działa dalej, tylko z pustą listą.

retry i retryWhen – powtórz zapytanie

Drobne fluktuacje sieci łatwo ogarnąć retry.

import { retry } from 'rxjs';

this.data$ = this.http.get('/api/data').pipe(
  retry(2), // spróbuj max 3 razy (1 + 2)
);

Przy bardziej skomplikowanych scenariuszach można użyć retryWhen i dorzucić np. delay albo eksp. backoff.

onErrorResumeNext – ignoruj i jedź dalej

Gdy pojedyncze źródła mogą padać, a reszta ma działać:

import { onErrorResumeNext } from 'rxjs';

const a$ = this.http.get('/api/a');
const b$ = this.http.get('/api/b');

const safe$ = onErrorResumeNext(a$, b$);

Każdy błąd kończy tylko aktualny strumień, kolejne idą dalej.

Reaktywna architektura komponentów: dane w górę, zdarzenia w dół

Dobrze poukładane komponenty łatwo się testuje i refaktoryzuje. RxJS pomaga trzymać porządek.

„Smart” vs „dumb” komponenty

Warstwa prezentacji nie powinna znać HttpClient ani routera. Może znać tylko wejściowe Observable i emitować zdarzenia.

// users-smart.component.ts
users$ = this.usersService.users$;

onReload() {
  this.usersService.reload();
}
<app-users-list
  [users]="users$ | async"
  (reload)="onReload()"
></app-users-list>
// users-list.component.ts
@Input() users: User[] | null = [];
@Output() reload = new EventEmitter<void>();

Najprostszy podział: RxJS i logika w „smart”, czysty szablon w „dumb”.

Strumień zdarzeń komponentu zamiast wielu @Output

Zamiast kilku @Output można mieć jeden strumień „intencji”.

// filter-bar.component.ts
export type FilterIntent =
  | { type: 'search'; term: string }
  | { type: 'toggleActive'; active: boolean };

@Output() intent = new EventEmitter<FilterIntent>();

onSearch(term: string) {
  this.intent.emit({ type: 'search', term });
}

onToggleActive(active: boolean) {
  this.intent.emit({ type: 'toggleActive', active });
}
// container
this.filterBarIntent$
  .pipe(
    // tu możesz mapować intencje do zapytań, patchować store, itd.
  )
  .subscribe();

Rzutuje to potem na spójne, reaktywne przepływy danych w całej aplikacji.

Strumień routera i guardy

Nawigacja w Angularze to też dane w czasie.

Parametry trasy jako Observable

ActivatedRoute daje strumienie params, queryParams, data.

// user-details.component.ts
user$ = this.route.paramMap.pipe(
  map(params => params.get('id')),
  switchMap(id => this.usersService.getUser(id!)),
);

constructor(private route: ActivatedRoute,
            private usersService: UsersService) {}

Zmiana parametru URL automatycznie przeładowuje dane. Bez ręcznego nasłuchiwania.

Reaktywne guardy

Guard może zwrócić Observable<boolean>, co pozwala na złożone scenariusze autoryzacji.

// auth.guard.ts
canActivate(): Observable<boolean> {
  return this.auth.user$.pipe(
    map(user => !!user),
  );
}

Z czasem można dorzucić tap do przekierowań, logowania itp.

Projekt weekendowy: mini-apka – założenia

Dla treningu przyda się prosta, ale kompletna aplikacja. Bez paneli admina i komplikacji.

Cel: „Lista zadań + filtry + statystyki”

Zakres:

  • lista zadań z możliwością dodania, odznaczenia, usunięcia,
  • filtry: wszystkie / aktywne / zakończone,
  • proste statystyki: ile zadań, ile ukończonych,
  • persist w localStorage lub prostym API.

Na tym poligonie przećwiczysz większość operatorów i Subjectów z wcześniejszych sekcji.

Struktura projektu i moduły

Podział na kilka prostych warstw wystarcza:

  • TodoStore – logika stanu zadań (BehaviorSubject),
  • TodoApiService – zapis/odczyt (API lub localStorage),
  • TodoPageComponent – kontener, który skleja wszystko,
  • komponenty prezentacyjne: lista, pojedynczy item, panel filtrów.

Model danych

export interface Todo {
  id: string;
  title: string;
  completed: boolean;
  createdAt: string;
}

Store oparty na BehaviorSubject i operatorach

Store trzyma stan, emituje zmiany i wystawia gotowe selektory.

// todo.store.ts
import { Injectable } from '@angular/core';
import { BehaviorSubject, map } from 'rxjs';
import { Todo } from './todo.model';

export type TodoFilter = 'all' | 'active' | 'completed';

@Injectable({ providedIn: 'root' })
export class TodoStore {
  private todosSubject = new BehaviorSubject<Todo[]>([]);
  private filterSubject = new BehaviorSubject<TodoFilter>('all');

  todos$ = this.todosSubject.asObservable();
  filter$ = this.filterSubject.asObservable();

  visibleTodos$ = this.todos$.pipe(
    combineLatestWith(this.filter$),
    map(([todos, filter]) => {
      switch (filter) {
        case 'active': return todos.filter(t => !t.completed);
        case 'completed': return todos.filter(t => t.completed);
        default: return todos;
      }
    }),
  );

  stats$ = this.todos$.pipe(
    map(todos => {
      const completed = todos.filter(t => t.completed).length;
      return {
        total: todos.length,
        completed,
        active: todos.length - completed,
      };
    }),
  );

  setTodos(todos: Todo[]) {
    this.todosSubject.next(todos);
  }

  add(title: string) {
    const todo: Todo = {
      id: crypto.randomUUID(),
      title,
      completed: false,
      createdAt: new Date().toISOString(),
    };
    this.todosSubject.next([...this.todosSubject.value, todo]);
  }

  toggle(id: string) {
    this.todosSubject.next(
      this.todosSubject.value.map(t =>
        t.id === id ? { ...t, completed: !t.completed } : t,
      ),
    );
  }

  remove(id: string) {
    this.todosSubject.next(
      this.todosSubject.value.filter(t => t.id !== id),
    );
  }

  setFilter(filter: TodoFilter) {
    this.filterSubject.next(filter);
  }
}

Cały „stan” w jednym miejscu, komponenty dostają już przetworzone strumienie.

Persist w localStorage z użyciem efektu

Prosty „efekt” można zapisać jako subskrypcję w serwisie.

// todo-persistence.service.ts
import { Injectable } from '@angular/core';
import { TodoStore } from './todo.store';
import { filter, tap } from 'rxjs';

const STORAGE_KEY = 'todos';

@Injectable({ providedIn: 'root' })
export class TodoPersistenceService {
  constructor(private store: TodoStore) {
    const raw = localStorage.getItem(STORAGE_KEY);
    if (raw) {
      this.store.setTodos(JSON.parse(raw));
    }

    this.store.todos$
      .pipe(
        tap(todos => {
          localStorage.setItem(STORAGE_KEY, JSON.stringify(todos));
        }),
      )
      .subscribe();
  }
}

Stan synchronizuje się z localStorage sam, bez kodu w komponentach.

Kontener strony: klejenie strumieni

Kontener zbiera Observable ze store’a i przekazuje do prezentacji.

// todo-page.component.ts
import { Component } from '@angular/core';
import { TodoStore, TodoFilter } from './todo.store';

@Component({
  selector: 'app-todo-page',
  templateUrl: './todo-page.component.html',
  providers: [TodoStore], // opcjonalnie per-strona
})
export class TodoPageComponent {
  todos$ = this.store.visibleTodos$;
  stats$ = this.store.stats$;
  filter$ = this.store.filter$;

  constructor(private store: TodoStore) {}

  add(title: string) {
    this.store.add(title);
  }

  toggle(id: string) {
    this.store.toggle(id);
  }

  remove(id: string) {
    this.store.remove(id);
  }

  setFilter(filter: TodoFilter) {
    this.store.setFilter(filter);
  }
}
<!-- todo-page.component.html -->
<app-todo-input (add)="add($event)"></app-todo-input>

<app-todo-filters
  [activeFilter]="filter$ | async"
  (changeFilter)="setFilter($event)"
></app-todo-filters>

<app-todo-stats [stats]="stats$ | async"></app-todo-stats>

<app-todo-list
  [todos]="todos$ | async"
  (toggle)="toggle($event)"
  (remove)="remove($event)"
></app-todo-list>

W szablonie tylko | async i przekazywanie zdarzeń w górę.

Komponent wejściowy sterowany Subjectem

Można dodać drobną logikę (np. debounce) bezpośrednio w komponencie z użyciem Subject.

// todo-input.component.ts
import { Component, OnDestroy } from '@angular/core';
import { Subject, debounceTime, filter, map, takeUntil } from 'rxjs';

@Component({
  selector: 'app-todo-input',
  template: `
    <input
      type="text"
      [value]="value"
      (input)="onInput($event)"
      (keyup.enter)="onEnter()"
      placeholder="Dodaj zadanie"
    />
  `
})
export class TodoInputComponent implements OnDestroy {
  @Output() add = new EventEmitter<string>();

  private inputSubject = new Subject<string>();
  private destroy$ = new Subject<void>();

  value = '';

  constructor() {
    this.inputSubject.pipe(
      debounceTime(200),
      map(v => v.trim()),
      takeUntil(this.destroy$),
    ).subscribe(v => {
      this.value = v;
    });
  }

  onInput(event: Event) {
    const target = event.target as HTMLInputElement;
    this.inputSubject.next(target.value);
  }

  onEnter() {
    const title = this.value.trim();
    if (!title) return;
    this.add.emit(title);
    this.value = '';
  }

  ngOnDestroy() {
    this.destroy$.next();
    this.destroy$.complete();
  }
}

Mechanizm Subject + takeUntil trzyma porządek w subskrypcjach także w małych komponentach.

Mini analytics: liczenie zdarzeń z merge i scan

Do apki można dorzucić prosty moduł „telemetrii” oparty o RxJS.

// analytics.service.ts
import { Injectable } from '@angular/core';
import { Subject, merge, scan, map } from 'rxjs';

type EventType = 'todo_add' | 'todo_toggle' | 'todo_remove';

@Injectable({ providedIn: 'root' })
export class AnalyticsService {
  private eventsSubject = new Subject<EventType>();
  events$ = this.eventsSubject.asObservable();

  counters$ = this.events$.pipe(
    scan((acc, type) => {
      acc[type] = (acc[type] ?? 0) + 1;
      return { ...acc };
    }, {} as Record<EventType, number>),
  );

  track(type: EventType) {
    this.eventsSubject.next(type);
  }
}
// w TodoPageComponent
constructor(private store: TodoStore,
            private analytics: AnalyticsService) {}

add(title: string) {
  this.store.add(title);
  this.analytics.track('todo_add');
}

toggle(id: string) {
  this.store.toggle(id);
  this.analytics.

Najczęściej zadawane pytania (FAQ)

Czy da się nauczyć RxJS w jeden weekend od zera?

W 2–3 dni da się ogarnąć solidne podstawy: czym jest Observable, jak działają kluczowe operatory i jak zarządzać subskrypcjami. To już wystarcza, żeby zacząć sensownie używać RxJS w interfejsie i przy pracy z API.

Nie opanujesz całego ekosystemu ani wszystkich operatorów. Na start wystarczy kilkanaście najważniejszych i kilka prostych wzorców – resztę dociągniesz w realnych projektach.

Od czego zacząć naukę RxJS jako frontend developer?

Na początku skup się na minimalnym zestawie: Observable, Observer, Subscription, podstawowe operatory (map, filter, tap, switchMap, mergeMap, debounceTime, distinctUntilChanged, catchError) oraz Subject i BehaviorSubject. To jest trzon, który przewija się w większości zastosowań.

Dobrą bazą ćwiczeń są proste przykłady z UI: input z wyszukiwaniem, przycisk ładujący dane z API, autosave formularza po chwili bezczynności. Dzięki temu szybko zobaczysz, jak RxJS porządkuje asynchroniczny bałagan.

Po co mi RxJS, skoro mam Promisy i async/await?

Promisy i async/await dobrze ogarniają pojedyncze operacje asynchroniczne, ale gorzej radzą sobie ze strumieniami zdarzeń w czasie (input, scroll, WebSocket, wiele żądań HTTP naraz). Tam zaczyna się callback hell, promise spaghetti i ręczne zarządzanie eventami.

RxJS pozwala traktować wszystkie te źródła jako strumienie, które można filtrować, łączyć i transformować operatorami. W praktyce logika staje się bardziej przewidywalna, krótsza i łatwiejsza do testowania, szczególnie gdy zdarzeń jest dużo.

Jakie są najczęstsze zastosowania RxJS w aplikacjach webowych?

Najczęściej RxJS pojawia się tam, gdzie dane zmieniają się w czasie i reagują na użytkownika: obsługa kliknięć, wpisywanie w pola formularzy, scroll, ruch myszką. Klasyk to pole wyszukiwania z debounce i dynamiczne filtrowanie list.

Drugi mocny obszar to praca z API: anulowanie starych zapytań, kolejkowanie, retry przy błędach, polling, WebSockety. Dochodzi jeszcze autosave (np. draft posta po chwili bezczynności) i dynamiczne filtrowanie/sortowanie tabel po wielu kryteriach naraz.

Czym różni się Subject od BehaviorSubject w RxJS?

Subject emituje wartości tylko „w przód” – nowy subskrybent dostaje emisje od momentu subskrypcji. BehaviorSubject zawsze przechowuje ostatnią wartość i natychmiast wysyła ją nowemu subskrybentowi.

W praktyce BehaviorSubject lepiej sprawdza się jako nośnik aktualnego stanu (np. zalogowany użytkownik, zawartość koszyka), a „zwykły” Subject jako kanał zdarzeń (np. kliknięcia, akcje użytkownika).

Jak unikać wycieków pamięci przy subskrypcjach RxJS?

Podstawowa zasada: subskrypcje trzeba sprzątać, gdy przestają być potrzebne. W Angularze najczęściej używa się async pipe albo unsubscribe w ngOnDestroy (czasem z pomocą Subjecta typu destroy$ i operatora takeUntil).

W czystym RxJS pomaga trzymanie się wzorca: jedna zmienna z Subscription lub add() na composite subscription, a przy zamykaniu komponentu/klasy wywołanie unsubscribe(). Warto też korzystać z operatorów, które same zakańczają strumień (take, takeUntil, first).

Co to znaczy, że Observable jest „hot” albo „cold”?

Cold observable zaczyna emitować wartości dopiero w momencie subskrypcji i dla każdego subskrybenta „odpala się” osobno (np. from(fetch(...)), of, from tablicy). Każdy obserwator widzi pełną sekwencję od początku.

Hot observable emituje niezależnie od subskrybentów – nowi dołączają „w trakcie filmu” (np. fromEvent, Subject, BehaviorSubject). To ma znaczenie przy projektowaniu strumieni stanu i przy współdzieleniu jednego źródła danych między wielu odbiorców.

Poprzedni artykułJak bezpiecznie korzystać z publicznego Wi‑Fi w podróży służbowej
Jan Walczak
Jan Walczak zajmuje się sztuczną inteligencją i uczeniem maszynowym, łącząc doświadczenie akademickie z praktyką komercyjną. Tworzył i wdrażał modele predykcyjne, systemy rekomendacyjne oraz rozwiązania NLP, dlatego w tekstach na Paczkimp3.pl skupia się na tym, co naprawdę działa w produkcji. Pokazuje, jak przejść od prototypu w notatniku do stabilnej usługi, jak dobierać narzędzia i jak unikać typowych pułapek. Każdy artykuł opiera na eksperymentach, repozytoriach z kodem i oficjalnej dokumentacji, a złożone algorytmy rozkłada na zrozumiałe, praktyczne „paczki” kroków.