Saturday, 20 October 2018

Understanding Rxjs Observable

Reactive Extensions for JavaScript (RxJS) is a reactive streams library that allows you to work with asynchronous data streams. RxJS can be used both in the browser or in the server-side using Node.js.

When version 2 of Angular came out, it introduced us to observables. The Observable isn’t an Angular specific feature, but a new standard for managing async data that will be included in the ES7 release. Angular uses observables extensively in the event system and the HTTP service.

What is Observable:
Observables are lazy collections of multiple values over time.

You can think of an observable as an array whose items arrive asynchronously over time. Observables help you manage asynchronous data, such as data coming from a backend service. Observables are used within Angular itself, including Angular’s event system and its http client service. To use observables, Angular uses a third-party library called Reactive Extensions (RxJS). Observables are a proposed feature for ES 2016, the next version of JavaScript.

An Observable by default is unicast. Unicasting means that each subscribed observer owns an independent execution of the Observable. To demonstrate this:

const observable = Rx.Observable.create((observer) => {
observer.next(Math.random());
});

// subscription 1
observable.subscribe((data) => {
  console.log(data); // 0.24957144215097515 (random number)
});

// subscription 2
observable.subscribe((data) => {
   console.log(data); // 0.004617340049055896 (random number)
});

Subjcts:
While Observables are unicast by design, this can be pretty annoying if you expect that each subscriber receives the same values. Subjects can help us overcome this issue.
Subjects can multicast. Multicasting basically means that one Observable execution is shared among multiple subscribers.

Subjects are like EventEmitters, they maintain a registry of many listeners. When calling subscribe on a Subject it does not invoke a new execution that delivers data. It simply registers the given Observer in a list of Observers.

So how to use Subjects to multicast
Multicasting is a characteristic of a Subject. You don’t have to do anything special to achieve this behaviour. This is a small multicast demonstration:

const subject = new Rx.Subject();

// subscriber 1
subject.subscribe((data) => {
    console.log(data); // 0.24957144215097515 (random number)
});

// subscriber 2
subject.subscribe((data) => {
    console.log(data); // 0.24957144215097515 (random number)
});

subject.next(Math.random());

Nice! Now i got two subscriptions getting the same data. This however is not all that Subjects can do.

Whereas Observables are solely data producers, Subjects can both be used as a data producer and a data consumer. By using Subjects as a data consumer you can use them to convert Observables from unicast to multicast. Here’s a demonstration of that:

const observable = Rx.Observable.create((observer) => {
    observer.next(Math.random());
});

const subject = new Rx.Subject();

// subscriber 1
subject.subscribe((data) => {
    console.log(data); // 0.24957144215097515 (random number)
});

// subscriber 2
subject.subscribe((data) => {
    console.log(data); // 0.24957144215097515 (random number)
});

observable.subscribe(subject);

We pass our Subject to the subscribe function and let it take the values that come out of the Observable (data consuming). All the subscribers to that Subject will then all immediately receive that value.

merge: Turn multiple observables into a single observable.

mergeMap: Merges all the observable sequences and Promises into a single observable sequence.
var source1 = Rx.Observable.interval(100)
    .timeInterval()
    .pluck('interval');
var source2 = Rx.Observable.interval(150)
    .timeInterval()
    .pluck('interval');

var source = Rx.Observable.merge(
    source1,
    source2)
    .take(5);

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });

// => Next: 100
// => Next: 150
// => Next: 100
// => Next: 150
// => Next: 100
// => Completed

concat: Concatenates all of the specified observable sequences, as long as the previous observable sequence terminated successfully.

/* Using Observable sequences */
var source1 = Rx.Observable.return(42);
var source2 = Rx.Observable.return(56);

var source = Rx.Observable.concat(source1, source2);

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });

// => Next: 42
// => Next: 56
// => Completed

defer: Returns an observable sequence that invokes the specified factory function whenever a new observer subscribes.

/* Using an observable sequence */
var source = Rx.Observable.defer(function () {
    return Rx.Observable.return(42);
});

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });

// => Next: 42
// => Completed

do: Invokes an action for each element in the observable sequence and invokes an action upon graceful or exceptional termination of the observable sequence.

This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.

/* Using a function */
var source = Rx.Observable.range(0, 3)
  .do(
    function (x)   { console.log('Do Next:', x); },
    function (err) { console.log('Do Error:', err); },
    function ()    { console.log('Do Completed'); }
  );

var subscription = source.subscribe(
  function (x) {
    console.log('Next: %s', x);
  },
  function (err) {
    console.log('Error: %s', err);
  },
  function () {
    console.log('Completed');
  });

// => Do Next: 0
// => Next: 0
// => Do Next: 1
// => Next: 1
// => Do Next: 2
// => Next: 2
// => Do Completed
// => Completed

takeuntil: Returns the values from the source observable sequence until the other observable sequence or Promise produces a value.

empty: Returns an empty observable sequence, using the specified scheduler to send out the single OnCompleted message.

var source = Rx.Observable.empty();

var subscription = source.subscribe(
  function (x) {
    console.log('Next: %s', x);
  },
  function (err) {
    console.log('Error: %s', err);
  },
  function () {
    console.log('Completed');
  });
  
// => Completed

catch: Continues an observable sequence that is terminated by an exception with the next observable sequence.

var obs1 = Rx.Observable.throw(new Error('error'));
var obs2 = Rx.Observable.return(42);

var source = Rx.Observable.catch(obs1, obs2);

var subscription = source.subscribe(
  function (x) {
    console.log('Next: %s', x);
  },
  function (err) {
    console.log('Error: %s', err);
  },
  function () {
    console.log('Completed');
  });

// => Next: 42
// => Completed

zip: Merges the specified observable sequences or Promises into one observable sequence by using the selector function whenever all of the observable sequences have produced an element at a corresponding index. If the result selector function is omitted, a list with the elements of the observable sequences at corresponding indexes will be yielded.

/* Without a result selector */
var range = Rx.Observable.range(0, 5);

var source = Rx.Observable.zip(
  range,
  range.skip(1),
  range.skip(2)
);

var subscription = source.subscribe(
  function (x) {
    console.log('Next: %s', x);
  },
  function (err) {
    console.log('Error: %s', err);
  },
  function () {
    console.log('Completed');
  });

// => Next: 0,1,2
// => Next: 1,2,3
// => Next: 2,3,4
// => Completed

map: Apply projection with each value from source.
// RxJS v6+
import { from } from 'rxjs';
import { map } from 'rxjs/operators';

//emit (1,2,3,4,5)
const source = from([1, 2, 3, 4, 5]);
//add 10 to each value
const example = source.pipe(map(val => val + 10));
//output: 11,12,13,14,15
const subscribe = example.subscribe(val => console.log(val));

pipe: You can use pipes to link operators together. Pipes let you combine multiple functions into a single function. The pipe() function takes as its arguments the functions you want to combine, and returns a new function that, when executed, runs the composed functions in sequence.
  1. import { filter, map } from 'rxjs/operators';
  2.  
  3. const nums = of(1, 2, 3, 4, 5);
  4.  
  5. // Create a function that accepts an Observable.
  6. const squareOddVals = pipe(
  7. filter((n: number) => n % 2 !== 0),
  8. map(n => n * n)
  9. );
  10.  
  11. // Create an Observable that will run the filter and map functions
  12. const squareOdd = squareOddVals(nums);
  13.  
  14. // Suscribe to run the combined functions
  15. squareOdd.subscribe(x => console.log(x));

filter: Emit values that pass the provided condition
// RxJS v6+
import { from } from 'rxjs';
import { filter } from 'rxjs/operators';

//emit (1,2,3,4,5)
const source = from([1, 2, 3, 4, 5]);
//filter out non-even numbers
const example = source.pipe(filter(num => num % 2 === 0));
//output: "Even number: 2", "Even number: 4"
const subscribe = example.subscribe(val => console.log(`Even number: ${val}`));

takeWhile: Emit values until provided expression is false.
// RxJS v6+
import { of } from 'rxjs';
import { takeWhile } from 'rxjs/operators';

//emit 1,2,3,4,5
const source = of(1, 2, 3, 4, 5);
//allow values until value from source is greater than 4, then complete
const example = source.pipe(takeWhile(val => val <= 4));
//output: 1,2,3,4
const subscribe = example.subscribe(val => console.log(val));
Difference between takeWhile() and filter()
// RxJS v6+
import { of } from 'rxjs';
import { takeWhile, filter } from 'rxjs/operators';

// emit 3, 3, 3, 9, 1, 4, 5, 8, 96, 3, 66, 3, 3, 3
const source = of(3, 3, 3, 9, 1, 4, 5, 8, 96, 3, 66, 3, 3, 3);

// allow values until value from source equals 3, then complete
// output: [3, 3, 3]
source
  .pipe(takeWhile(it => it === 3))
  .subscribe(val => console.log('takeWhile', val));

// output: [3, 3, 3, 3, 3, 3, 3]
source
  .pipe(filter(it => it === 3))
  .subscribe(val => console.log('filter', val));

From: convert various other objects and data types into Observables
// RxJS v6+
import { from } from 'rxjs';

//emit array as a sequence of values
const arraySource = from([1, 2, 3, 4, 5]);
//output: 1,2,3,4,5
const subscribe = arraySource.subscribe(val => console.log(val));

of: Emit variable amount of values in a sequence.
// RxJS v6+
import { of } from 'rxjs';
//emits any number of provided values in sequence
const source = of(1, 2, 3, 4, 5);
//output: 1,2,3,4,5
const subscribe = source.subscribe(val => console.log(val));
catchError: Gracefully handle errors in an observable sequence.

dispatchEvent:

Read more https://github.com/Reactive-Extensions/operators




0 comments:

Post a Comment

Topics

ADO .Net (2) Ajax (1) Angular Js (17) Angular2 (24) ASP .Net (14) Azure (1) Breeze.js (1) C# (49) CloudComputing (1) CMS (1) CSS (2) Design_Pattern (3) DI (3) Dotnet (21) Entity Framework (3) ExpressJS (4) Html (3) IIS (1) Javascript (6) Jquery (9) Lamda (3) Linq (11) Mongodb (1) MVC (48) NodeJS (7) RDLC (1) Report (1) Sql Server (29) SSIS (3) SSRS (2) UI (1) WCF (12) Web Api (10) Web Service (1) XMl (1)