Handling Video Events with RxJS

In this post, I am going to explore RxJS by implementing a feature twice—once using RxJS and once using a more traditional, iterative approach. I am going to add to both simultaneously, giving a running comparison of the two implementations. In the end, I hope to give you a better idea of what it’s like to think in RxJS, and whether it’s a good (or not so good) choice for your project.

Instead of explaining in-depth what RxJS does (which can be notoriously overwhelming at first glance), I am going to dive right into an implementation, providing explanation as we go where necessary.

What We're Building

We're going to create a quartile tracker for HTML video. This is a somewhat common feature in the media world—we want to do something (probably send a beacon to some analytics provider) whenever we reach a certain point in a video as a means of measuring how engaged users are with our content. For our purposes, a simple console.log will do just fine. The API we’re aiming for is fairly straightforward:

  1. We provide an HTML video element and a callback we would like to be, well, called, whenever we reach a quartile (that is, the 25%, 50% and 75% marks in the video).
  2. For now, we just expect the index of the quartile (1, 2 or 3) to be passed to our callback.
  3. For a playlist of videos, each quartile should be called a maximum of one time per video, as soon as playback exceeds that threshold. Once a new video starts, we can emit a new set of quartiles.

In other words, as soon as we reach the 25% mark, our quartile fires with the number 1. If we seek back in the video and pass 25% again, we should not get another quartile. If we immediately seek to 85% of the way through the video, all three quartiles should fire.

Listening to Events

We will move toward the intended behavior step by step. Here is our signature (and initial implementation):

// quartiles.js
const quartiles = (videoEl, sendQuartile) => {
  sendQuartile();
};

And the sendQuartile function we’ll be passing in:

// client code
const sendQuartile = index => {
  console.log(`REACHED INDEX: ${index}`);
};

Let’s start by reacting in some way to the video progressing. We’ll just beacon on all time updates. Here’s the first approach:

// quartiles.js
const quartiles = (videoEl, sendQuartile) => {
  const onTimeUpdate = () => {
    const progress = videoEl.currentTime / videoEl.duration;
    sendQuartile(progress);
  };

  videoEl.addEventListener('timeupdate', onTimeUpdate);
};

export default quartiles;

And here’s the same thing using RxJS:

// quartilesRx.js
import { fromEvent } from 'rxjs';

const quartilesRx$ = (videoEl, sendQuartile) => {
  fromEvent(videoEl, 'timeupdate').subscribe(() => {
    const progress = videoEl.currentTime / videoEl.duration;
    sendQuartile(progress);
  });
};

export default quartilesRx$;

The first thing you’ll notice is that the two approaches are virtually the same. Rx seems to give us a convenience function for listening to events, but that’s about it. Here’s a slightly altered version:

// quartilesRx.js
import { map } from 'rxjs/operators';
import { fromEvent } from 'rxjs';

const getProgress$ = videoEl => {
  return fromEvent(videoEl, 'timeupdate').pipe(map(() => {
    return videoEl.currentTime / videoEl.duration;
  }));
};

const quartilesRx$ = (videoEl, sendQuartile) => {
  getProgress$(videoEl).subscribe(sendQuartile);
};

export default quartilesRx$;

I’ve moved the progress calculation out of the subscribe callback and into an invocation of the map operator. There are a few of things going on here:

  1. fromEvent() is returning an Observable. This is the core datatype of RxJS. Observables all have a subscribe method that takes a callback to which it passes events. For now you can think of it as similar to a Promise with its .then method, except it can be called multiple times.
  2. pipe() is a method on Observable that takes in one or more operators that transform the Observable into another Observable. This is similar to how you can call .map or .filter on an array and be returned another array.
  3. In this case we are using a single operator, map. For each event from the input Observable (the from Event), the map function will be applied to it and it will be emitted as part of the output Observable (getProgress$).

So we’re taking an Observable that emits a bunch of time update events (which are just typical HTML events), and transforming it into an Observable that emits the amount of relative time that has elapsed in the video.

We will be following this pattern throughout this feature—keeping logic out of the subscribe function. In general, we’ll see how keeping the core logic for this feature upstream of the subscribe function will allow us more flexibility down the line.

Transforming Events

Emitting a constant, indiscriminate stream of relative time events doesn’t feel very quartile-like. Let’s emit actual quartile indexes (1, 2 or 3), and ensure we only do so after we reach the appropriate point in the video:

// quartiles.js
const QUARTILE_INDEXES = [1, 2, 3];

const quartiles = (videoEl, sendQuartile) => {
  const onTimeUpdate = () => {
    const progress = videoEl.currentTime / videoEl.duration;
    QUARTILE_INDEXES.forEach(index => {
      if (progress > index / 4) {
        sendQuartile(index);
      }
    });
  };

  videoEl.addEventListener('timeupdate', onTimeUpdate);
};

export default quartiles;
// quartilesRx.js
import { map, mergeAll, filter, mapTo } from 'rxjs/operators';
import { fromEvent, from } from 'rxjs';

const QUARTILE_INDEXES = [1, 2, 3];

const getProgress$ = videoEl => {
  return fromEvent(videoEl, 'timeupdate').pipe(map(() => {
    return videoEl.currentTime / videoEl.duration;
  }));
};

const getQuartile$ = progress$ => index => progress$.pipe(
  filter(progress => progress > index / 4),
  mapTo(index)
);

const getQuartiles$ = videoEl => {
  const progress$ = getProgress$(videoEl);
  const quartiles = QUARTILE_INDEXES.map(getQuartile$(progress$));
  return from(quartiles).pipe(mergeAll());
};

const quartilesRx$ = (videoEl, sendQuartile) => {
  getQuartiles$(videoEl).subscribe(sendQuartile);
};

export default quartilesRx$;

In the first version, we simply loop through our indexes whenever there is a time update and check whether we should send a beacon.

In the Rx version, we build from smaller components. First, we create getQuartile$, which takes our progress$ Observable and an index, and returns an Observable that filters out any events that happen before that quartile.

In getQuartiles$, we create an array of Observables by mapping getQuartile$ over our indexes. Then we do two things:

  1. Turn the array into an Observable using from().
  2. Flatten the array of Observables into a single Observable using mergeAll() (This is the equivalent of flattening an array of arrays using .flat())

Oftentimes, this pairing of .map() and .mergeAll() can be replaced with a single call to mergeMap(), as seen in this equivalent code:

// quartilesRx.js
import { map, mergeMap, filter, mapTo } from 'rxjs/operators';
import { fromEvent, from } from 'rxjs';

const QUARTILE_INDEXES = [1, 2, 3];

const getProgress$ = videoEl => {
  return fromEvent(videoEl, 'timeupdate').pipe(map(() => {
    return videoEl.currentTime / videoEl.duration;
  }));
};

const getQuartile$ = progress$ => index => progress$.pipe(
  filter(progress => progress > index / 4),
  mapTo(index)
);

const getQuartiles$ = videoEl => {
  const progress$ = getProgress$(videoEl);
  return from(QUARTILE_INDEXES).pipe(mergeMap(getQuartile$(progress$)));
};

const quartilesRx$ = (videoEl, sendQuartile) => {
  getQuartiles$(videoEl).subscribe(sendQuartile);
};

export default quartilesRx$;

In some ways the Rx version admittedly feels like a step backward. It’s 10 lines longer, and not particularly easier to read. However, what it lacks in brevity, it makes up for in modularity. Each function returns a well-defined Observable, which could be reused or (more importantly) subscribed to in its own right for some other feature. What’s more, making everything an Observable each step of the way will allow us to leverage some other RxJS operators in a moment.

Oh and by the way, if brevity is your thing, Rx is pretty good at that too (though for some, it leaves something to be desired in terms of code style):

// quartilesRx.js (alternative)
const quartilesRx$ = (videoEl, sendQuartile) => {
  from(QUARTILE_INDEXES).pipe(mergeMap(index => {
    return fromEvent(videoEl, 'timeupdate').pipe(
      map(() => videoEl.currentTime / videoEl.duration),
      filter(progress => progress > index / 4),
      mapTo(index)
    );
  })).subscribe(sendQuartile);
};

Let’s now fulfill the requirement that each quartile fire at most once per video:

// quartiles.js (unchanged code omitted)
...
const quartiles = (videoEl, sendQuartile) => {
  const indexesReached = new Set();

  const onTimeUpdate = () => {
    const progress = videoEl.currentTime / videoEl.duration;
    QUARTILE_INDEXES.forEach(index => {
      if (progress > index / 4 && !indexesReached.has(index)) {
        indexesReached.add(index);
        sendQuartile(index);
      }
    });
  };
...
// quartilesRx.js (unchanged code omitted)
...
const getQuartile$ = progress$ => index => progress$.pipe(
  filter(progress => progress > index / 4),
  take(1),
  mapTo(index)
);
...

This is where moving stuff around on the Rx side starts to pay off a bit. The take(1) operator basically says, Give me an Observable that is the input Observable, except it only emits one time before completing. It adds implicit state to getQuartile$, telling it to be the sort of thing that only ever emits once, ever, which its semantically exactly how it is defined (for a given video).

The other version isn’t so bad either, except that the state has to be explicit. This is pretty succinct, thanks to the use of an ES6 Set(), but it’s somewhat dishonest about its level of immutability. (Even though the Set() itself is constant, the underlying members are not). Nevertheless, maintaining that and tossing another check in our existing if statement gives us what we need.

Replaying Observables

So, there’s a problem with both implementations—neither of them works for more than one video. We need to make sure we get a fresh set of quartiles every time a new video begins. It just so happens that the ‘loadeddata’ HTML video event will tell us exactly that, so let’s use it:

// quartiles.js (unchanged code omitted)
...
const quartiles = (videoEl, sendQuartile) => {
  const indexesReached = new Set();

  const onStart = () => {
    indexesReached.clear();
  }

  const onTimeUpdate = () => {
    const progress = videoEl.currentTime / videoEl.duration;
    QUARTILE_INDEXES.forEach(index => {
      if (progress > index / 4 && !indexesReached.has(index)) {
        indexesReached.add(index);
        sendQuartile(index);
      }
    });
  };

  videoEl.addEventListener('timeupdate', onTimeUpdate);
  videoEl.addEventListener('loadeddata', onStart);
};
...
// quartilesRx.js (unchanged code omitted)
...
const getQuartilesForVideo$ = videoEl => {
  const progress$ = getProgress$(videoEl);
  return from(QUARTILE_INDEXES).pipe(mergeMap(getQuartile$(progress$)));
};

const getQuartiles$ = videoEl => {
  return fromEvent(videoEl, 'loadeddata').pipe(
    switchMapTo(getQuartilesForVideo$(videoEl))
  );
};
...

The non-Rx version is pretty straightforward: we simply clear the indexesReached Set() whenever a video starts.

The Rx version is quite different, since there’s nothing for us to clear. First, we simply rename our existing function to getQuartilesForVideo$, since it already does what we want, but for only one video.

Then, we lift that function into a switchMapTo over a ‘loadeddata’ fromEvent. Each time your source Observable emits, switchMapTo kicks off an entirely new Observable and uses its events instead. If that output Observable is still going when your source Observable emits again, it will cancel the output Observable and replace it with yet another new Observable.

That is the behavior we want. Every time we get a new video, we get a fresh quartiles Observable.

Cleaning Up After Ourselves

At this point we've actually met all of the listed requirements!  But I'd like to add one more feature—the ability to cancel these quartiles. It would be nice if, when a client created the quartiles, it was given a function in return that could be used to stop the quartiles from firing (and, being the good browser citizens that we are, clean up any event listeners). Using it would look something like:

// client code
const stopQuartilesFn = quartilesRx$(videoEl, sendQuartile);
...
//some time later...
stopQuartilesFn();

Let’s see how that would work:

// quartiles.js (unchanged code omitted)
...
  videoEl.addEventListener('timeupdate', onTimeUpdate);
  videoEl.addEventListener('loadeddata', onStart);

  return () => {
    videoEl.removeEventListener('timeupdate', onTimeUpdate);
    videoEl.removeEventListener('loadeddata', onStart);
  }
};
// quartilesRx.js (unchanged code omitted)
...
const quartilesRx$ = (videoEl, sendQuartile) => {
  const subscription = getQuartiles$(videoEl).subscribe(sendQuartile);
  return () => subscription.unsubscribe();
};

If there’s one feature where I think RxJS really outshines the other implementation, it’s this. The one call to unsubscribe() takes care of any clean up we might need, no matter how nested up the Observable chain it is. This includes the initial fromEvent Observables, which in turn removes the underlying event listeners for them. It’s sort of like using destructors to handle memory management in C++; by using fromEvent to create those Observables, we guarantee they will be properly cleaned up once we terminate the subscription.

In the non-Rx version, we have to do the cleanup ourselves. The implementation can wrap the individual removeEventListeners as we do here, but we’re prone to forgetting one somewhere, especially as the function becomes more complex.

Here are the final side-by-side implementations (I took the liberty of adding some throttling to the timeupdate events to demonstrate making this a little more feature-rich):

// quartiles.js (final)
const QUARTILE_INDEXES = [1, 2, 3];
const TIME_THRESHOLD = 3000;

const quartiles = (videoEl, sendQuartile) => {
  const indexesReached = new Set();
  let previousTime = Date.now();

  const onStart = () => {
    indexesReached.clear();
  };

  const onTimeUpdate = () => {
    if (Date.now() > previousTime + TIME_THRESHOLD) {
      previousTime = Date.now();
      const progress = videoEl.currentTime / videoEl.duration;
      QUARTILE_INDEXES.forEach(index => {
        if (progress > index / 4 && !indexesReached.has(index)) {
          indexesReached.add(index);
          sendQuartile(index);
        }
      });
    }
  };

  videoEl.addEventListener('timeupdate', onTimeUpdate);
  videoEl.addEventListener('loadeddata', onStart);

  return () => {
    videoEl.removeEventListener('timeupdate', onTimeUpdate);
    videoEl.removeEventListener('loadeddata', onStart);
  };
};

export default quartiles;

// quartilesRx.js (final)
import { map, mergeMap, filter, mapTo, take, switchMapTo, throttleTime } from 'rxjs/operators';
import { fromEvent, from } from 'rxjs';

const QUARTILE_INDEXES = [1, 2, 3];
const TIME_THRESHOLD = 3000;

const getProgress$ = videoEl => {
  return fromEvent(videoEl, 'timeupdate').pipe(map(() => {
    return videoEl.currentTime / videoEl.duration;
  }));
};

const getQuartile$ = progress$ => index => progress$.pipe(
  throttleTime(TIME_THRESHOLD),
  filter(progress => progress > index / 4),
  take(1),
  mapTo(index)
);

const getQuartiles$ = progress$ => {
  return from(QUARTILE_INDEXES).pipe(mergeMap(getQuartile$(progress$)));
};

const getQuartilesForVideos$ = videoEl => {
  const progress$ = getProgress$(videoEl);
  return fromEvent(videoEl, 'loadeddata').pipe(
    switchMapTo(getQuartiles$(progress$))
  );
};

const quartilesRx$ = (videoEl, sendQuartile) => {
  const subscription = getQuartilesForVideos$(videoEl).subscribe(sendQuartile);
  return () => subscription.unsubscribe();
};

export default quartilesRx$;

Conclusion

If you've made it this far, congrats! That was a lot to take in.

To be honest, I was pleasantly surprised at few lines the non-Rx version managed to be. My hunch is that, when introducing quartiles into a larger application, the imperative version would start to get hairier, but so far it has proven manageable. So that begs the question: in what ways, if any, is the RxJS version better?

Interestingly, a pretty strong case could be made for the RxJS version if you look at it through the lens of a set of design principles typically associated with OOP: SOLID. If you bear with me just a little longer, I'll show some examples of how it applies here:

  • Single-responsibility: Whether we liked it or not, the RxJS version forced us to separate things into a hierarchy of abstraction levels, from the top-level subscribe() all down to the individual getQuartile$ Observables, each self-contained within its own function. The imperative version bundles it all together. One option for modularizing the imperative option would be have two functions: one for the 'loadeddata' level and a lower-level one for all the 'timeupdate' logic. That would work, but it would at the very least make the cleanup more error-prone (see above)
  • Open-closed: Say we had another Observable (or EventEmitter) that we wanted to use in lieu of 'loadeddata' because it contained more relevant information (such as a video's title, metadata, etc.) for analytics. How would we go about replacing the old logic? In the non-Rx version, we would have to do some surgery on our function. Perhaps we could parameterize our start trigger. In the Rx version, we can add that functionality just by exporting our getQuartiles$ function, wrapping it in a different switchMap and subscribing to it.
  • Dependency Inversion: If you look at our implementations of getQuartiles$() and getQuartile$(), they actually don't know anything about videos. Instead they depend on progress$ which has to be an observable that emits numbers. This isn't just a theoretical nicety--it enables us to pass in mock streams for tests, and reuse the logic for other temporal components, such as HTML audio elements, or custom video players with their own events APIs. Again, in the imperative version, we could break up the functions an parameterize the event emitters, but it would complicate the API and move the responsibility of cleanup to the client.

If after all that, you're no closer to deciding if RxJS is right for you, I'll leave you with one suggestion: Use Rx within some higher-level framework to give you clear direction on your larger project architecture, and introduce it piecemeal. AngularJS has RxJS baked into it. If you're in the React world, you should look into redux-observable. One of the hardest skills in RxJS is knowing when to (and not to) use it, and both of these frameworks make it much easier to draw those lines in your application.