Table of contents
During this article, you’ll get a full insight into how promises differ with regard to observables. We’ll try to cover all you need to know in one place. Each topic is going to be followed by code examples. So, you’ll better believe you won’t get bored here.
Pull vs Push
Pull and Push are two different protocols that describe how a data Producer can communicate with a data Consumer.
What is Pull? In Pull systems, the Consumer determines when it receives data from the data Producer. The Producer itself is unaware of when the data will be delivered to the Consumer.
What is Push? In Push systems, the Producer determines when to send data to the Consumer. The Consumer is unaware of when it will receive that data.
Promises
Promises are the most common type of Push system in JavaScript today. A Promise (the Producer) delivers a resolved value to registered callbacks (the Consumers), but unlike functions, it is the Promise which is in charge of determining precisely when that value is "pushed" to the callbacks.
Observables
Observable is a new Push system for JavaScript. An Observable is a Producer of multiple values, “pushing” them to Observers (Consumers).
You can find out more about the push and pull-based systems by referring to the documentation, where I took a bare minimum.
Single vs Multiple
Observables
Observables can emit either multiple values over a period of time or a single value at a time.
Promises
Unlike observables promises only can emit a single value at a time.
Async Promise vs Async/Sync Observable
Promise is always asynchronous. Here is an example that demonstrates this in action:
console.log('--- Start ---');
Promise.resolve('Promise').then(console.log)
console.log('--- End ---');
Observable might be either asynchronous or synchronous. Let’s take a look at an asynchronous example first:
import { interval } from 'rxjs';
import { take } from 'rxjs/operators';
console.log('--- Start ---');
const obs = interval(1000).pipe(take(10));
obs.subscribe(console.log);
console.log('--- End ---');
Here is an example of synchronous observable:
import { range } from 'rxjs';
console.log('--- Start ---');
const obs = range(0, 5);
obs.subscribe(console.log);
console.log('--- End ---');
Convert Promise to Observable
from operator
The easiest way to convert promise to observable is to use
Of course, there is no need to unsubscribe us observable will be complete as soon as the promise will be resolved.
import { from } from 'rxjs';
from(Promise.resolve(2))
.subscribe({
next: console.log,
error: console.error,
complete: () => console.log(`complete`)
});
fromFetch operator
Promise returned by
import { of } from 'rxjs';
import { fromFetch } from 'rxjs/fetch';
import { switchMap, catchError } from 'rxjs/operators';
const toError = (message: string) => of({ error: true, message });
const data$ = fromFetch('https://api.github.com/users?per_page=1')
.pipe(
switchMap((response: Response) => {
return response.ok
? response.json()
: toError(`Error: ${response.status}`);
}),
catchError((err: Error) => {
return toError(err.message);
})
);
data$.subscribe({
next: console.log,
complete: () => console.log('done')
});
Important to note, that some parts are still in the experimental state. To get more familiar you can refer to the documentation.
More about fetch API find on MDN Web Docs.
Convert Observable to Promise
toPromise - from RxJS7 is deprecated
Converts Observable to a Promise, which will be resolved once observable complete. The last emission will be returned as a resolved value.
If observable won’t complete, we will end up with a promise that won’t be resolved.
import { Observable, Subscriber } from 'rxjs';
const obs = new Observable((subscriber: Subscriber<number>) => {
subscriber.next(1);
subscriber.complete();
});
obs.subscribe(console.log);
firstValueFrom — first emitted value
Converts an observable to a promise by subscribing to the observable, and returning a promise that will resolve as soon as the first value arrives from the observable. The subscription will then be closed.
firstValueFrom doesn’t wait for the observable to complete:
import { firstValueFrom, Observable, Subject } from 'rxjs';
async function demo<T>(observable: Observable<T>): Promise<void> {
const result = await firstValueFrom(observable);
console.log(result);
}
const subject = new Subject();
const obs = subject.asObservable();
demo(obs);
subject.next(1);
subject.next(2);
subject.next(3);
/* Observable is not completed yet, and will proceed to emit values */
obs.subscribe(console.log);
subject.next(5);
- When observable completes before the values arrive, the promise will be rejected with
EmptyError
import { EmptyError, firstValueFrom, Observable, Subject } from 'rxjs';
async function demo<T>(observable: Observable<T>): Promise<void> {
try {
const result = await firstValueFrom(observable);
console.log(result);
} catch (error) {
console.log(error instanceof EmptyError);
console.error(error);
}
}
const subject = new Subject();
demo(subject.asObservable());
subject.complete();
- When observable completes before the values arrive, we get an
EmptyError . To prevent this, we might pass a second argument which basically is a fallback.
import { firstValueFrom, Observable, Subject } from 'rxjs';
async function demo<T>(observable: Observable<T>): Promise<void> {
try {
const result = await firstValueFrom(observable, { defaultValue: 25 });
console.log(result);
} catch (error) {
console.error(error);
}
}
const subject = new Subject();
demo(subject.asObservable());
subject.complete();
lastValueFrom - last emitted value
For the most part, it's pretty similar to
- Converts observable to promise, and waits for it to complete. When observable completes, the returned promise is resolved with the last emitted value.
import { lastValueFrom, Observable, Subject } from 'rxjs';
async function demo<T>(observable: Observable<T>): Promise<void> {
const result = await lastValueFrom(observable);
console.log(result);
}
const subject = new Subject();
demo(subject.asObservable());
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.next(5);
subject.complete();
- If observable completes before the values arrive, returned promise will be rejected with the
EmptyError
import { EmptyError, lastValueFrom, Observable, Subject } from 'rxjs';
async function demo<T>(observable: Observable<T>): Promise<void> {
try {
const promiseResult = await lastValueFrom(observable);
console.log(promiseResult);
} catch (error) {
console.log(error instanceof EmptyError);
console.error(error);
}
}
const subject = new Subject();
demo(subject.asObservable());
subject.complete();
- When observable completes before the values arrive, we get an
EmptyError . To prevent this, we might pass a second argument which basically is a fallback.
import { EmptyError, lastValueFrom, Observable, Subject } from 'rxjs';
async function demo<T>(observable: Observable<T>): Promise<void> {
try {
const promiseResult = await lastValueFrom(observable, { defaultValue: 25 });
console.log(promiseResult);
} catch (error) {
console.log(error instanceof EmptyError);
console.error(error);
}
}
const subject = new Subject();
demo(subject.asObservable());
subject.complete();
Does RxJS work with Promises?
Yes, it does. Many operators work with Promises as well as with observables.
Here’s a little case study of this strategy in action:
import {
combineLatest,
of,
merge,
concat,
exhaustMap,
mergeMap,
concatMap,
forkJoin,
} from 'rxjs';
import { switchMap, withLatestFrom } from 'rxjs/operators';
combineLatest([Promise.resolve(2), Promise.resolve(3)]).subscribe((value) =>
console.log(`combineLatest: ${value}`)
);
forkJoin({ obs1: Promise.resolve(1), obs2: Promise.resolve(2) }).subscribe(
(value) => console.log(`forkJoin: ${JSON.stringify(value)}`)
);
of(1)
.pipe(withLatestFrom(Promise.resolve(3)))
.subscribe((value) => console.log(`withLatestFrom: ${value}`));
of(1)
.pipe(switchMap((value) => of(5)))
.subscribe((value) => console.log(`switchMap: ${value}`)); // 5
of(1)
.pipe(exhaustMap((value) => of(5)))
.subscribe((value) => console.log(`exhaustMap: ${value}`)); // 5
of(1)
.pipe(mergeMap((value) => of(5)))
.subscribe((value) => console.log(`mergeMap: ${value}`)); // 5
of(1)
.pipe(concatMap((value) => of(5)))
.subscribe((value) => console.log(`concatMap: ${value}`)); // 5
merge(Promise.resolve(1), Promise.resolve(2)).subscribe((value) =>
console.log(`merge: ${value}`)
);
concat(Promise.resolve(1), Promise.resolve(2)).subscribe((value) =>
console.log(`concat: ${value}`)
);
I’ve picked the most popular, such as:
switchMap exhaustMap mergeMap concatMap combineLatest forkJoin withLatestFrom merge concat
and only
You can check whether the operator accepts the Promise by checking if it accepts the
And now, you’re probably thinking how awesome this is, but not really.
Important to note here is that you better use the
More details on this can be found in the documentation.
Promise boxed into Observable is Hot
Promise is eager and will start to produce value right away, even if there is no subscription. Here is an example:
import { from } from 'rxjs';
const fromPromise$ = from(
new Promise((resolve, reject) => {
console.log('Will be invoked only once, and right away.')
resolve(3);
})
);
The value produced by Promise is produced outside and runs before any subscription, thus it's hot.
Promise itself will run only once, and produced value will be shipped to all subscribers.
import { from } from 'rxjs';
function getRandomInt(max: number): number {
return Math.floor(Math.random() * max);
}
const fromPromise$ = from(
new Promise((resolve, reject) => {
console.log('Promise will be invoked only once, right away, and with the same value.');
resolve(getRandomInt(100));
})
);
fromPromise$.subscribe(console.log);
fromPromise$.subscribe(console.log);
fromPromise$.subscribe(console.log);
Let's make it cold using defer operator:
To make it cold, we can apply the defer operator:
import { defer } from 'rxjs';
function getRandomInt(max: number): number {
return Math.floor(Math.random() * max);
}
const fromPromise$ = defer(() =>
new Promise((resolve, reject) => {
console.log('Promise will run multiple times, and produce different values.')
resolve(getRandomInt(100));
})
)
fromPromise$.subscribe(console.log);
fromPromise$.subscribe(console.log);
fromPromise$.subscribe(console.log);
Unicast vs Multicast
Observable is unicast because each observer has its own instance of the data producer. On the other hand, Observable is multicast if each observer receives notifications from the same producer.
Here is an example of unicast observable:
import { Observable } from 'rxjs';
function getRandomInt(max: number): number {
return Math.floor(Math.random() * max);
}
const randomInt = getRandomInt(100);
const obs = new Observable(subscriber => {
subscriber.next(randomInt);
subscriber.complete();
});
obs.subscribe((value: number) => console.log(`1st subscription: ${value}`));
obs.subscribe((value: number) => console.log(`2st subscription: ${value}`));
and this is multicasting:
import { Observable } from 'rxjs';
function getRandomInt(max: number): number {
return Math.floor(Math.random() * max);
}
const obs = new Observable(subscriber => {
const randomInt = getRandomInt(100);
subscriber.next(randomInt);
subscriber.complete();
});
obs.subscribe((value: number) => console.log(`1st subscription: ${value}`));
obs.subscribe((value: number) => console.log(`2st subscription: ${value}`));
Obviously, that Promise is unicast.
Initial load vs lack of functionality
RxJS is not a native API. That means we would need to load the library initially. Obviously, this is a downside. Let’s take a look at the size:
The size isn’t big, but the room for doubt still is, especially, when you work on a smaller project.
Let’s take a look from the other side. Promises are native API, but the promises themselves do not provide even half of the functionality that exists in RxJS. Something like canceling, retries, debouncing, throttling, different types of combinations, etc. is not in place. This means, if you’re interested in similar functionality, you would need to bring up some new library. Likely, the best option from the promise libraries is bluebird.js.
Let’s take a closer look at bluebird.js size: