angular - Rxjs
Rxjs
//Observable(可观察者):表示未来(future)值或事件的可调用集合的概念。
const observable = new Observable((subscriber) => {
setTimeout(() => {
subscriber.next({ name: '张三' });
subscriber.complete();
}, 2000);
});
// Observer(观察者):是一个回调集合,它知道如何监听 Observable 传来的值。
const observer = {
next: function (x: any) {
console.log(x);
}
}
// Subscription(订阅):表示 Observable 的一次执行,主要用于取消执行。
observable.subscribe(observer);
console.log('just after subscribe');
可观察对象(Observable)/观察者(Observer)/订阅(Subscribe)
(1)可多次调用next
发送数据
//Observable(可观察者):表示未来(future)值或事件的可调用集合的概念。
const observable = new Observable((subscriber) => {
setTimeout(() => {
subscriber.next({ name: '张三' });
subscriber.complete();
}, 2000);
});
// Observer(观察者):是一个回调集合,它知道如何监听 Observable 传来的值。
const observer = {
next: function (x: any) {
console.log(x);
}
}
// Subscription(订阅):表示 Observable 的一次执行,主要用于取消执行。
observable.subscribe(observer);
console.log('subscribe 之后');
(2)当完成使用 complete
结束调用
//Observable(可观察者):表示未来(future)值或事件的可调用集合的概念。
const observable = new Observable((subscriber) => {
let index = 0;
let timer = setInterval(() => {
subscriber.next(index++);
if (index == 5) {
subscriber.complete();
clearInterval(timer);
}
}, 1000);
});
// Observer(观察者):是一个回调集合,它知道如何监听 Observable 传来的值。
const observer = {
next: function (x: any) {
console.log(x);
},
complete: () => {
console.log('完成');
}
}
// Subscription(订阅):表示 Observable 的一次执行,主要用于取消执行。
observable.subscribe(observer);
console.log('subscribe 之后');
(3)error
:内部逻辑错误,将失败信息发给订阅者。Observerable 终止。
//Observable(可观察者):表示未来(future)值或事件的可调用集合的概念。
const observable = new Observable((subscriber) => {
let index = 0;
let timer = setInterval(() => {
subscriber.next(index++);
if (index == 5) {
// subscriber.complete();
subscriber.error("失败了~");
clearInterval(timer);
}
}, 1000);
});
// Observer(观察者):是一个回调集合,它知道如何监听 Observable 传来的值。
const observer = {
next: function (x: any) {
console.log(x);
},
error: (x: string) =>{
console.log(x);
},
complete: () => {
console.log('完成');
}
}
// Subscription(订阅):表示 Observable 的一次执行,主要用于取消执行。
observable.subscribe(observer);
console.log('subscribe 之后');
Subject 构造
订阅立即执行 | 传参 | 广播历史 | |
---|---|---|---|
Subject | 否 | 否 | 否 |
BehaviorSubject | ✔ | ✔ | 否 |
ReplaySubject | 否 | 否 | ✔ |
用于创建可观察对象,但订阅后不会立刻执行,next 可以在可观察对象外部调用
const demoSubject = new Subject()
demoSubject.subscribe({
next: (x: any) => {
console.log(x);
}
})
demoSubject.subscribe({
next: (x: any) => {
console.log(x);
}
})
console.log('没调用');
setTimeout(() => {
demoSubject.next("222") //2秒后调用
}, 2000);
BehaviorSubject:可传参的 Subject
,而且立即调用
const behaviorSubject = new BehaviorSubject('默认值~');
behaviorSubject.subscribe({
next: (x: any) => {
console.log(x);
}
})
behaviorSubject.next('改变值');
ReplaySubject:它通过在新订阅者首次订阅时发送旧值来“重播”旧值。
ReplaySubject
会广播历史结果,而Subject
不会广播历史结果
import { ReplaySubject } from 'rxjs';
const replaySubject = new ReplaySubject();
replaySubject.subscribe({
next: (x: any) => {
console.log(x);
}
})
replaySubject.next('hello 1')
replaySubject.next('hello 2')
setTimeout(() => {
//过2秒再订阅
replaySubject.subscribe({
next: (x: any) => {
console.log(x);
}
})
}, 3000);
操作符
操作符是 Observable 类型上的方法,比如 .map(...)
、.filter(...)
、.merge(...)
,等等。当操作符被调用时,它们不会改变已经存在的 Observable 实例。相反,它们返回一个新的 Observable ,它的 subscription 逻辑基于第一个 Observable 。
操作符是函数,它基于当前的 Observable 创建一个新的 Observable。这是一个无副作用的操作:前面的 Observable 保持不变。
创建操作符
range 指定范围内的数字序列
range(start: number, count: number, scheduler: Scheduler): Observable
创建一个 Observable ,它发出指定范围内的数字序列。
import { range } from 'rxjs';
range(0,10).subscribe((x)=>console.log(x))
from (ObservableInput) 创建一个 Observable
从一个 ObservableInput (数组、类数组对象、Promise、迭代器对象或者类 Observable 对象) 创建一个 Observable.
from([10, 20, 30]).subscribe(x => console.log(x));
//10
//20
//30
of 根据提供的参数创建 Observable
发出你提供的参数,然后完成。
of([1, 2, 3]).subscribe(x => console.log(x));
//打印 [1,2,3]
of('a',1,[1, 2, 3]).subscribe(x => console.log(x));
//'a'
//1
//[1, 2, 3]
fromEvent 发出来自给定事件对象的指定类型事件
创建一个 Observable,该 Observable 发出来自给定事件对象的指定类型事件。
通过给“事件目标”添加事件监听器的方式创建 Observable,可能会是拥有addEventListener
和 removeEventListener
方法的对象,一个 Node.js 的 EventEmitter,一个 jQuery 式的 EventEmitter, 一个 DOM 的节点集合, 或者 DOM 的 HTMLCollection。 当输出 Observable 被订阅的时候事件处理函数会被添加, 当取消订阅的时候会将事件处理函数移除。
名称 | 类型 | 属性 | 描述 |
---|---|---|---|
target | EventTargetLike | DOMElement, 事件目标, Node.js EventEmitter, NodeList 或者 HTMLCollection 等附加事件处理方法的对象。 | |
eventName | string | 感兴趣的事件名称, 被 target 发出。 | |
options | EventListenerOptions | - 可选的 | 可选的传递给 addEventListener 的参数。 |
selector | SelectorMethodSignature | - 可选的 | 可选的函数处理结果. 接收事件处理函数的参数,应该返回单个值。 |
fromEvent(document, 'click').subscribe(x => console.log(x));
// 结果:
// 每次点击 document 时,都会在控制台上输出 MouseEvent 。
angular
html
<button id="btn">提交</button>
app.component.ts
ngOnInit() {
const btn = document.getElementById('btn')
if (btn != null) {
fromEvent(btn, 'click')
.pipe(map(event => event.target))
.subscribe(console.log);
}
}
interval 定期发出自增的数字
每隔一段时间发送数值,数值递增
interval(1000).subscribe(console.log)
timer 指定时间的 interval
就像是interval, 但是你可以指定什么时候开始发送
timer(3000,1000).subscribe(console.log) //像interval,但是这里可以设置过3秒
转换操作符
map 数据转换
基于数据流,进行数据转换。
import { map, range } from 'rxjs';
range(0,10)
.pipe(map(x=>x*10))
.subscribe((x)=>console.log(x))
swtichMap 切换可观察对象
它仍然提供一个Observable作为输出,不是通过合并,而是通过仅从最新的Observable 发出的结果。
重点突出在切换最后1次Observeble.
const btn = document.getElementById('btn')
if (btn != null) {
fromEvent(btn, 'click')
.pipe(switchMap(event => interval(1000)))
.subscribe(console.log);
}
对于我们的最后一个示例,如果我们使用switchMap,我们只会从最后一个Observable 中获取结果。
const dymiAPI = (character:any): any => {
return of(`API response for character: ${character}`).pipe(delay(1000))
}
from(['aa', 'bb', 'cc', 'dd']).pipe(
switchMap(arr => dymiAPI(arr))
).subscribe(data => console.log(data)) //这里只会输出最新的 dd
//结果:
//API response for character: dd
pluck(rxjs v8要弃用)
获取属性值
ngOnInit() {
const btn = document.getElementById('btn')
if (btn != null) {
fromEvent(btn, 'click')
// .pipe(map(event => event.target))
.pipe(pluck('target'))
.subscribe(console.log);
}
}
过滤操作符
take
获取最前的N个
interval(1000).pipe(take(2)).subscribe(console.log)
takeWhile
满足 predicate
函数的每个值,并且一旦出现不满足 predicate
的值就立即完成
range(1, 10).pipe(takeWhile(x => x < 5)).subscribe(console.log) //1,2,3,4
fromEvent(document, 'mousemove')
.pipe( takeWhile((Event: any) => Event.clientX > 200))
.subscribe(console.log) //打印 mousemove 事件的 event,鼠标移动到clientX 小于 200 的时候,停止
takeUntil
直到 notifier
Observable 发出值
const btn = document.getElementById('btn')
if (btn != null) {
fromEvent(document, 'mousemove')
.pipe(takeUntil(fromEvent(btn, 'click')))
.subscribe(console.log) //打印 mousemove 事件的event,点击 btn,停止
}
throttleTime 限流
节流,可观察对象高频次发送数据流,限定时间内只能向订阅者发送一次数据流。
fromEvent(document, 'click')
.pipe(throttleTime(2000))
.subscribe(console.log) //鼠标不停点击浏览器内容,2秒内只执行一次
debounceTime 防抖
防抖,高频次发送数据流,只响应最后一次。
fromEvent(document, 'click')
.pipe(debounceTime(2000))
.subscribe(console.log)
点浏览器内容N次,过2秒执行最后1次(最新1次)的结果
点击1次
distinctUntilChanged 检测数据流是否和上次相同
检测数据源发出的数据流是否和上次一样,相同就忽略,不相同就发出。
组合操作符
forkjoin 合并多个请求,直到发出的最后一个值
类似 Promise.all(),等待 Observables 完成,然后合并它们发出的最后一个值。
import { forkJoin, of, timer } from 'rxjs';
const observable = forkJoin({
foo: of(1, 2, 3, 4),
bar: Promise.resolve(8),
baz: timer(4000)
});
observable.subscribe({
next: value => console.log(value),
complete: () => console.log('This is how it ends!'),
});
// Logs:
// { foo: 4, bar: 8, baz: 0 } ==> after 4 seconds
// 'This is how it ends!' ==> 紧接着