[Complete] Sub-RFC 4: Observable and Signal Interoperability · angular/angular · Discussion #49681
Sub-RFC 4: Observable and Signal Interoperability
Changelog
April 10, 2023
fromSignalandfromObservablerenamed totoObservableandtoSignal, respectively.- By default, the type of
toSignalwill beSignal<T|undefined>whereTis the return type of theObservable. - Added section detailing the obstacles to implementing
InteropObservable.
Introduction
Many existing applications and libraries use RxJS to manage state. Observables should be able to interoperate with these applications. To solve this problem, we propose two new APIs: toSignal and toObservable, both exported from the @angular/core/rxjs-interop package. The goal with these functions is to provide a lightweight and sensible default bridge between Observable and Signal concepts. This allows interop with existing RxJS code and optionally enables every Signal-based API in Angular to be used with RxJS, for much more complete support (for example, unlocks inputs as observables).
RxJS is a powerful and feature-rich library and there will certainly be cases that are not covered in these default behaviors. Rather than trying to include every possible feature, we want to make sure what we're including covers most use-cases without bloating the API in order to support uncommon scenarios.
toSignal
You can convert an Observable to an Angular Signal by using the toSignal function.
export function toSignal<T, U extends T|null|undefined>( source: Observable<T>, options: {initialValue: U, requireSync?: false}): Signal<T|U>; export function toSignal<T>( source: Observable<T>, options: {requireSync: true}): Signal<T>;
The two different signatures support two different ways of dealing with the initial value (see the "Asynchronicity" section below).
const counter: Signal<number> = toSignal(counter$);
Managing the subscription
The toSignal function internally subscribes to the given Observable and updates the returned Signal any time the Observable emits a value.
This subscription will be created immediately as opposed to waiting until the Signal is read. We do this intentionally because we do not want the signal read to have side effects (e.g. opening a network connection on subscription). If the subscription were created lazily, then the act of reading the signal for the first time would have the side-effect of creating the Observable subscription and this may have its own side effects. This is also how the async pipe works.
Angular will automatically unsubscribe from the Observable when the context in which it was created is destroyed. For example, if toSignal is called in a component or directive, then the subscription is cleaned up when the component or directive is destroyed. If it is instead created in a service, it will be cleaned up when that service's injector is destroyed.
Asynchronicity
Observables can be used to model both synchronous and asynchronous data flow. However, they don't distinguish these two cases in their API - any Observable might be synchronous, or it might be asynchronous. Signals, on the other hand, are always synchronous. The signature of the toSignal function supports both synchronous and asynchronous Observables.
Initial Values
Before the Observable emits, the signal returned by toSignal must have an "initial" value.
If not provided explicitly, this initial value is undefined (similarly to how the async pipe returns null initially).
const data: Signal<string|undefined> = toSignal(http.get('/data')); console.log(data()); // "undefined" before the data is available
There are many cases where undefined is not the best choice of initial value. For these cases, toSignal allows the initial value to be configured directly:
// The first value will not be emitted until 5 seconds later const secondsObs = interval(5000); // Provide an initial value of zero. const seconds = toSignal(secondsObs, {initialValue: 0}); effect(() => { console.log(seconds()); });
Requiring synchronous emit
Some Observables are known to emit synchronously. In those cases, you can have toSignal verify that the Observable produces a value immediately, and forgo providing or dealing with an initial value.
// BehaviorSubjects emit synchronously: const balance$ = new BehaviorSubject(50); const balance: Signal<number> = toSignal(balance$, {requireSync: true});
This is a trade-off: requiring a synchronous emit avoids any need for handling of undefined values (or manually configuring initial values). However, if balance$ is ever made asynchronous (such as by adding a debounce operation, for example), toSignal will throw an error.
Error and completion states
A signal is a wrapper around a value, which is capable of notifying interested consumers when that value changes. An Observable has three types of notifications when an Observer subscribes to it: next, error, and complete.
A signal's value is directly linked to the values coming from the next notification of the Observable.
When the Observer created by toSignal is notified of an error, it will throw this error the next time the signal is read. This error can be handled the same way any other error coming from a signal would be:
const mySignal = toSignal(myObservable$); try { mySignal(); } catch (e: unknown) { // Handle the error from the observable here }
If handling the error where the signal is used is not desirable, the error can be transformed either on the Observable side (via catchError) or on the signal side (via computed):
const mySignal = toSignal(myObservable$.pipe(catchError(() => of('error')));
const mySignal = toSignal(myObservable$); const mySignalNoErrors = computed(() => { try { return mySignal(); } catch { return 'error'; } );
Signals do not have a concept of being "complete," because they are just wrappers around a value. When an Observable is complete, the Observer created by toSignal will simply not receive new values from the stream. We acknowledge that the completion state does have a meaning for an Observable and sometimes you might care about this. Even with it not being surfaced directly in the returned signal, this state can be represented in a different signal, for example:
const isComplete = signal(false); const signalFromObservable = toSignal( myObservable$.pipe(finalize(() => isComplete.set(true))); );
There’s also a materialize operator that “represents all of the notifications from the source Observable as next emissions marked with their original types within Notification objects.” This pipe could be used to surface both the complete and error notifications.
toObservable
toObservable is a function that takes an Angular Signal and returns an Observable. It does this by creating an effect when the Observable is subscribed to, which takes values from the signal and streams them to subscribers.
const count: Observable<number> = toObservable(counterObs);
Lifecycle and Cleanup
Observables are designed to manage the lifecycle of the side effects they create. When a toObservable Observable is subscribed, it creates an effect to monitor the signal, which exists until that subscriber unsubscribes.
This differs from toSignal, which automatically cleans up its subscription when the component or service which invoked it is destroyed.
We believe it would be incorrect for toObservable to tie the resulting Observable to the lifecycle of the enclosing component, for two reasons:
- The incoming Signal might itself not have the same lifecycle as the component. Signals are valid as long as anyone still has a reference, and the signal may have been passed in externally.
- The Observable might be used in other external RxJS computations, such as being passed into a
switchMap. Moreover, it could be passed into a service and persisted. Finally, observable unsubscriptions might have side-effects, and the user should control when those run, as with other Observables.
If desired, it's straightforward to tie the resulting Observable to the component's lifecycle manually:
const myValue$ = toObservable(myValue).pipe(takeUntil(this.destroy$));
All values are asynchronous
The Observable produced by toObservable uses an effect to send the next value. However, because effects are by nature asynchronous, the Observable emissions will be asynchronous, even for the first value.
Therefore, all values emitted by the toObservable Observable are delivered asynchronously.
This has some interesting consequences. Consider setting a signal several times in a row, which is a synchronous operation:
const obs = toObservable(mySignal); mySignal.set(1); mySignal.set(2); mySignal.set(3);
The observable obs will only ever produce the value 3, because the signal is set synchronously, but the observable is scheduled to update asynchronously. In this way, a Signal is subtly different from a BehaviorSubject. In other words, unlike Observables, signals are not streams.
We do not emit the first value synchronously. To do so, we'd have to skip the first emission in the effect -- but what if the signal updates before the effect is scheduled for the first time? We'd need to check the signal's value equality, which might not be reliable (e.g. a mutated object).
If you want to get the first value synchronously, you can straightforwardly implement this behavior as so:
const obs$ = toObservable(mySignal).pipe(startWith(mySignal()));
One Effect for All Observers (via shareReplay)
If you expect to have many subscribers to an observable produced from a signal, you might want to use the shareReplay operator. This helps you avoid creating a new effect for each subscriber.
This would look something like the following:
const obs = toObservable(mySignal).pipe( shareReplay({refCount: true, bufferSize: 1}) );
refCount is set so that the underlying effect is cleaned up after all users unsubscribe. The bufferSize is one because we only care about caching the latest value.
Why not do this automatically inside toObservable? There are a few potential reasons:
- Inconsistent behavior: The first subscriber will receive its value asynchronously, but later subscribers will be synchronously replayed.
- Code size: We'd have to include
shareReplayfor everyone using RxJS interop, when some applications may not want it. - Easy to implement: As shown above, it's easy to use
shareReplayyourself if you want this behavior. It's not nearly as easy to get rid of it, if you care about the sync/async discrepancy. - Low overhead: Registering multiple effects incurs minimal performance cost, comparable to a subscription to a
shareReplayobservable.
InteropObservable
An idea that came up during the discussion was to make signals implement InteropObservable. While this can work in some situations, we ultimately decided against it for two principle reasons:
- Watching a signal requires an injection context.
This is because effects fundamentally need access to the effect scheduler for the application they're running in, so we'd need to use inject to get ahold of it.
This would be a very surprising and burdensome requirement for anyone trying to put signal reads inside e.g. switchMap, since Observable pipelines don't typically run with injection contexts. You'd have to wrap the operators in runInInjectionContext, which would be painful and boilerplate-y. An explicit conversion from signal to observable outside of the observable pipeline is actually more ergonomic.
For example, the following pattern:
something$.pipe( switchMap(v => dataSignal), )
may function correctly if the initial emit is synchronous, but error later on when the switchMap operator tries to call the InteropObservable API on dataSignal internally the next time something$ emits, because it'll no longer be in an injection context.
Additionally, there is no straightforward way to fix this issue, because the injection context comes from the event source (whatever produced the next value to something$), not the Observable chain creation. So it's not possible to wrap switchMap in runInInjectionContext - a custom, injection-context-aware operator would have to be used instead.
Overall, this adds a level of friction that we felt undermines any potential convenience of InteropObservable.
- Signals are not Observables and converting between them should be an intentional operation.
This part of the argument is more philosophical. Just like how subscribing to an Observable has side effects and manual subscriptions should be done with care, choosing to use the signal graph to manually drive additional data flow should also be done with consideration for the impact on the application's architecture. In some cases it can be necessary, but often there are better patterns such as driving Observable chains from the original application events, rather than as side effects of signal updates.
takeUntilDestroyed
Angular users often want to complete a stream when a related subject completes. The following illustrative pattern is quite common:
destroyed$ = new ReplaySubject<void>(1); data$ = http.get('...').pipe(takeUntil(this.destroyed$)); ngOnDestroy() { this.destroyed$.next(); }
We are introducing a new RxJS operator called takeUntilDestroyed, which simplifies this example into the following:
data$ = http.get('...').pipe(takeUntilDestroyed());
By default, this operator will inject the current cleanup context. For example, used in a component, it will use the component's lifetime.
takeUntilDestroyed is especially useful when you want to tie the lifecycle of your toObservable Observable to a particular component's lifecycle.
However, if you want to override the default behavior, you can manually provide a DestroyRef. This lets you call takeUntilDestroyed outside of a context where inject is available.