10/28/2024

Exploring the RxJS share operator

Exploring the RxJS share operator

Recently I tweeted about the RxJs operators I use on a daily basis.

I got some responses regarding use cases about share, so let’s dive into it.

Ever wondered why you see multiple HTTP-requests in the network tab of your browser when you use Angular’s HttpClient even if you are shure that you only make one request? After reading this article you will know why and how to fix it.

TL;DR

Most Observables are unicast sources by default, which results in producing a value for every new subscriber. With applying share we can transform a Observable to a multicast source, so that the producer function is executed only once and the same produced value is shared to all subscribers.

Understanding fundamental observable behavior

Let’s try to understand some fundamental behavior of observables before we dive into share and what use cases it can solve.

Here’s a little example:

import { map, timer } from "rxjs";

// create a observable which will emit one value that gets mapped to a random number
const random$ = timer(1).pipe(map(() => Math.random()));

const subscriberOne = random$.subscribe((value) =>
  console.log("subscriber one: ", value),
);

const subscriberTwo = random$.subscribe((value) =>
  console.log("subscriber two: ", value),
);

It will print something like this to the console:

subscriber one:  0.09236817024332566
subscriber two:  0.11372766697317016

🤯 This might be surprising that the values are not the same.

The reason for this is that (almost every) observable by default is a unicast source.

Understanding unicast

To understand unicast, we need to understand first how observables actually emits values. In simple terms, to emit values a observable needs to produce them. This is done by executing a so called producer function.

From our example above the producer function is everything that describes the random$ observable, meaning: timer(1).pipe(map(() => Math.random())).

As we can see in the graphic below, the producer function is exectued for every subscriber. Unicast

This is the reason why the two subscribers get different values. And this is what unicast means: every time a .subscribe() happens, the producer function is executed, so the source needs to do some work.

Exploring Multicast

Multicast

Knowing what unicast means, it should be fairly easy to understand multicast.

Multicast means that the producer function is exectued for the first subscriber, producing the value. Subsequent subscribers will get the previously produced value. A multicastes source is like a cache: it stores the prduced value(s) and emits them to all subscribers.

Transforming unicast to multicast with share

Ok, we know that we somehow need our unicast random$ observable to a multicasted observable so that we get the same value for every subscriber.

To do this the share operator comes into play. share is a operator from the group of operators called multicasting operators.

Let’s apply it to our example:

import { map, timer, share } from "rxjs";

// create a observable which will emit one value that gets mapped to a random number
const random$ = timer(1).pipe(
  map(() => Math.random()),
  share(),
);

const subscriberOne = random$.subscribe((value) =>
  console.log("subscriber one: ", value),
);

const subscriberTwo = random$.subscribe((value) =>
  console.log("subscriber two: ", value),
);

The output is now:

subscriber one:  0.07283698435188501
subscriber two:  0.07283698435188501

🥳 we did it!

Solving duplicated HTTP-requests

Going back to the ingoing statement regarding duplicated HTTP-requests: the unicasting behavior is exactly the answer to this issue. Doing the request is the actual producer function, which gets executed for every subscriber, resulting in multiple requests.

A common pattern for having multiple subscribers is not only to have multiple .subscribe() calls, but also having derived observables which are consumed via async pipe:

@Component({
  selector: "my-component",
  template: `
    @for (data of data$ | async; track data.id) {
      <data-component [data]="data" />
    }

    We got {{ dataLength$ | async }} records
  `,
})
export class MyComponent {
  #dataService = inject(DataService);

  // getData() uses the HTTP-client under the hood
  data$ = this.#dataService.getData();
  dataLength$ = this.data$.pipe(map((data) => data.length));
}

In this scenario we got 2 subscribers to the observable returned by getData(): data$ and dataLength$.

Advanced

share allows a very powerful configuration to handle more advanced use cases. When we hear the term cache then automatically some questions might pop up:

  • how much values will be cached?
  • how long will the cache live?
  • how will the cache be evicted?

Let’s modify our example a little bit and introduce time to simulate the lifetime of an application where user interact with the application.

// 👇 we are now emitting a value every 100ms and limit it to 3 values with take(3)
const random$ = interval(100).pipe(
  take(3),
  map(() => Math.random()),
  share(),
);

const subscriberOne = random$.subscribe((value) =>
  console.log("subscriber one: ", value),
);

const subscriberTwo = random$.subscribe((value) =>
  console.log("subscriber two: ", value),
);

// 👇 new subscriber after some time
setTimeout(() => {
  const subscriberThree = random$.subscribe((value) =>
    console.log("subscriber three: ", value),
  );
}, 200);

So we have now a source which produces every 100ms a random value and stops after 3 values. We got two subscribers from the beginning and a third comint after 200ms, where 2 values already have been produced.

The console output will look something like this:

subscriber one:    0.07283698435188501
subscriber two:    0.07283698435188501

subscriber one:    0.012423800828001097
subscriber two:    0.012423800828001097

subscriber one:    0.851868279737849
subscriber two:    0.851868279737849
subscriber three:  0.851868279737849

As we can see the third subscriber get’s the same last value, but not the previous values.

Configuring how much values should be cached

Let’s dive a little into configuring the behavior of share, so that the third subscriber gets all values, no matter when they were produced.

We only need to make a slight configuration change to the share operator applied to random$:

share({ connector: () => new ReplaySubject(3) });

The connector-property expects a Subject, which is used internally to store the values. A ReplaySubject can store the last n values, here we set n to 3. Without specifing an all values will be cached.

Applying this configuration to our example, we get the following output:

subscriber one:    0.07283698435188501
subscriber two:    0.07283698435188501

subscriber one:    0.012423800828001097
subscriber two:    0.012423800828001097
// 👇 this changed
subscriber three:  0.07283698435188501
subscriber three:  0.012423800828001097

subscriber one:    0.851868279737849
subscriber two:    0.851868279737849
subscriber three:  0.851868279737849

As we can see once the third subscriber get the last values once it subscribes.

A reasonable default for most real-world use cases is to use a ReplaySubject with a buffer size of 1 to cache only the most recent value.

Configuring how long the cache should live

I will only demonstrate a very simple example here, there are a lot of different, powerful ways to configure the cache lifetime.

Let’s modify our example a last time:

const random$ = interval(100).pipe(
  take(3),
  map(() => Math.random()),
  share({
    connector: () => new ReplaySubject(3),
    // 👇 we added this
    resetOnRefCountZero: true,
  }),
);

const subscriberOne = random$.subscribe((value) =>
  console.log("subscriber one: ", value),
);

const subscriberTwo = random$.subscribe((value) =>
  console.log("subscriber two: ", value),
);

// 👇 we added this
setTimeout(() => {
  subscriberOne.unsubscribe();
  subscriberTwo.unsubscribe();
}, 400);

setTimeout(() => {
  const subscriberThree = random$.subscribe((value) =>
    console.log("subscriber three: ", value),
  );
  // 👇 changed
}, 500);

Now the third subscriber comes after 500ms, where all values have been produced already. The subscribers one and two unsubscribe right after 400ms.

Looking at our output:

subscriber one:    0.07283698435188501
subscriber two:    0.07283698435188501

subscriber one:    0.012423800828001097
subscriber two:    0.012423800828001097

subscriber one:    0.851868279737849
subscriber two:    0.851868279737849
subscriber three:  0.1742209919468578
subscriber three:  0.6092285719586927
subscriber three:  0.35207890593724844

subscriberThree get’s 3 values but completely different ones than the other two subscribers.

Let’s break it down why this happens:

With adding resetOnRefCountZero: true to the share-config we specified that the cache should be resetted when the number of subscribers drops to zero. This is happening after 400 ms.

When subscriberThree subscribes to random$, the producer function is executed again, emitting three different randum numbers.

All configuration possibilities of share are documented in the RxJs docs.

Important hint about resetOnRefCountZero. By default it is set to true, which is a good default. Setting it to false has the potential to cause memory leaks.

Summary

share is a very powerful operator, which can be used to solve a lot of use cases.

It is important to understand the basics of unicast and multicast before using share, so that you can use it correctly.

If you want to learn more about share and other operators, I recommend reading the RxJs docs.