浅谈Python响应式类库RxPy

 更新时间:2021年6月14日 15:00  点击:1444

一、基本概念

Reactive X中有几个核心的概念,先来简单介绍一下。

1.1、Observable和Observer(可观察对象和观察者)

首先是Observable和Observer,它们分别是可观察对象和观察者。Observable可以理解为一个异步的数据源,会发送一系列的值。Observer则类似于消费者,需要先订阅Observable,然后才可以接收到其发射的值。可以说这组概念是设计模式中的观察者模式和生产者-消费者模式的综合体。

1.2、Operator(操作符)

另外一个非常重要的概念就是操作符了。操作符作用于Observable的数据流上,可以对其施加各种各样的操作。更重要的是,操作符还可以链式组合起来。这样的链式函数调用不仅将数据和操作分隔开来,而且代码更加清晰可读。一旦熟练掌握之后,你就会爱上这种感觉的。

1.3、Single(单例)

在RxJava和其变体中,还有一个比较特殊的概念叫做Single,它是一种只会发射同一个值的Observable,说白了就是单例。当然如果你对Java等语言比较熟悉,那么单例想必也很熟悉。

1.4、Subject(主体)

主体这个概念非常特殊,它既是Observable又是Observer。正是因为这个特点,所以Subject可以订阅其他Observable,也可以将发射对象给其他Observer。在某些场景中,Subject会有很大的作用。

1.5、Scheduler(调度器)

默认情况下Reactive X只运行在当前线程下,但是如果有需要的话,也可以用调度器来让Reactive X运行在多线程环境下。有很多调度器和对应的操作符,可以处理多线程场景下的各种要求。

1.6、Observer和Observable

先来看看一个最简单的例子,运行的结果会依次打印这些数字。这里的of是一个操作符,可以根据给定的参数创建一个新的Observable。创建之后,就可以订阅Observable,三个回调方法在对应的时机执行。一旦Observer订阅了Observable,就会接收到后续Observable发射的各项值。

from rx import of

ob = of(1, 2, 34, 5, 6, 7, 7)
ob.subscribe(
    on_next=lambda i: print(f'Received: {i}'),
    on_error=lambda e: print(f'Error: {e}'),
    on_completed=lambda: print('Completed')

)

这个例子看起来好像很简单,并且看起来没什么用。但是当你了解了Rx的一些核心概念,就会理解到这是一个多么强大的工具。更重要的是,Observable生成数据和订阅的过程是异步的,如果你熟悉的话,就可以利用这个特性做很多事情。

1.7、操作符

在RxPy中另一个非常重要的概念就是操作符了,甚至可以说操作符就是最重要的一个概念了。几乎所有的功能都可以通过组合各个操作符来实现。熟练掌握操作符就是学好RxPy的关键了。操作符之间也可以用pipe函数连接起来,构成复杂的操作链。

from rx import of, operators as op
import rx

ob = of(1, 2, 34, 5, 6, 7, 7)
ob.pipe(
    op.map(lambda i: i ** 2),
    op.filter(lambda i: i >= 10)
).subscribe(lambda i: print(f'Received: {i}'))

在RxPy中有大量操作符,可以完成各种各样的功能。我们来简单看看其中一些常用的操作符。如果你熟悉Java8的流类库或者其他函数式编程类库的话,应该对这些操作符感到非常亲切。

1.8、创建型操作符

首先是创建Observable的操作符,列举了一些比较常用的创建型操作符。

1.9、过滤型操作符

过滤型操作符的主要作用是对Observable进行筛选和过滤。

1.10、转换型操作符

1.11、算术操作符

1.12、Subject

Subject是一种特殊的对象,它既是Observer又是Observable。不过这个对象一般不太常用,但是假如某些用途还是很有用的。所以还是要介绍一下。下面的代码,因为订阅的时候第一个值已经发射出去了,所以只会打印订阅之后才发射的值。

from rx.subject import Subject, AsyncSubject, BehaviorSubject, ReplaySubject

# Subject同时是Observer和Observable

print('--------Subject---------')
subject = Subject()
subject.on_next(1)
subject.subscribe(lambda i: print(i))
subject.on_next(2)
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 2 3 4

另外还有几个特殊的Subject,下面来介绍一下。

1.13、ReplaySubject

ReplaySubject是一个特殊的Subject,它会记录所有发射过的值,不论什么时候订阅的。所以它可以用来当做缓存来使用。ReplaySubject还可以接受一个bufferSize参数,指定可以缓存的最近数据数,默认情况下是全部。

下面的代码和上面的代码几乎完全一样,但是因为使用了ReplaySubject,所以所有的值都会被打印。当然大家也可以试试把订阅语句放到其他位置,看看输出是否会产生变化。

# ReplaySubject会缓存所有值,如果指定参数的话只会缓存最近的几个值
print('--------ReplaySubject---------')
subject = ReplaySubject()
subject.on_next(1)
subject.subscribe(lambda i: print(i))
subject.on_next(2)
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 1 2 3 4

1.14、BehaviorSubject

BehaviorSubject是一个特殊的Subject,它只会记录最近一次发射的值。而且在创建它的时候,必须指定一个初始值,所有订阅它的对象都可以接收到这个初始值。当然如果订阅的晚了,这个初始值同样会被后面发射的值覆盖,这一点要注意。

# BehaviorSubject会缓存上次发射的值,除非Observable已经关闭
print('--------BehaviorSubject---------')
subject = BehaviorSubject(0)
subject.on_next(1)
subject.on_next(2)
subject.subscribe(lambda i: print(i))
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 2 3 4

1.15、AsyncSubject

AsyncSubject是一个特殊的Subject,顾名思义它是一个异步的Subject,它只会在Observer完成的时候发射数据,而且只会发射最后一个数据。因此下面的代码仅仅会输出4.假如注释掉最后一行co_completed调用,那么什么也不会输出。

# AsyncSubject会缓存上次发射的值,而且仅会在Observable关闭后开始发射
print('--------AsyncSubject---------')
subject = AsyncSubject()
subject.on_next(1)
subject.on_next(2)
subject.subscribe(lambda i: print(i))
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 4

1.16、Scheduler

虽然RxPy算是异步的框架,但是其实它默认还是运行在单个线程之上的,因此如果使用了某些会阻碍线程运行的操作,那么程序就会卡死。当然针对这些情况,我们就可以使用其他的Scheduler来调度任务,保证程序能够高效运行。

下面的例子创建了一个ThreadPoolScheduler,它是基于线程池的调度器。两个Observable用subscribe_on方法指定了调度器,因此它们会使用不同的线程来工作。

import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as op

import multiprocessing
import time
import threading
import random


def long_work(value):
    time.sleep(random.randint(5, 20) / 10)
    return value


pool_schedular = ThreadPoolScheduler(multiprocessing.cpu_count())

rx.range(5).pipe(
    op.map(lambda i: long_work(i + 1)),
    op.subscribe_on(pool_schedular)
).subscribe(lambda i: print(f'Work 1: {threading.current_thread().name}, {i}'))

rx.of(1, 2, 3, 4, 5).pipe(
    op.map(lambda i: i * 2),
    op.subscribe_on(pool_schedular)
).subscribe(lambda i: print(f'Work 2: {threading.current_thread().name}, {i}'))

如果你观察过各个操作符的API的话,可以发现大部分操作符都支持可选的Scheduler参数,为操作符指定一个调度器。如果操作符上指定了调度器的话,会优先使用这个调度器;其次的话,会使用subscribe方法上指定的调度器;如果以上都没有指定的话,就会使用默认的调度器。

二、应用场景

好了,介绍了一些Reactive X的知识之后,下面来看看如何来使用Reactive X。在很多应用场景下,都可以利用Reactive X来抽象数据处理,把概念简单化。

2.1、防止重复发送

很多情况下我们都需要控制事件的发生间隔,比如有一个按钮不小心按了好几次,只希望第一次按钮生效。这种情况下可以使用debounce操作符,它会过滤Observable,小于指定时间间隔的数据会被过滤掉。debounce操作符会等待一段时间,直到过了间隔时间,才会发射最后一次的数据。如果想要过滤后面的数据,发送第一次的数据,则要使用throttle_first操作符。

下面的代码可以比较好的演示这个操作符,快速按回车键发送数据,注意观察按键和数据显示之间的关系,还可以把throttle_first操作符换成debounce操作符,然后再看看输出会发生什么变化,还可以完全注释掉pipe中的操作符,再看看输出会有什么变化。

import rx
from rx import operators as op
from rx.subject import Subject
import datetime

# debounce操作符,仅在时间间隔之外的可以发射

ob = Subject()
ob.pipe(
    op.throttle_first(3)
    # op.debounce(3)
).subscribe(
    on_next=lambda i: print(i),
    on_completed=lambda: print('Completed')
)

print('press enter to print, press other key to exit')
while True:
    s = input()
    if s == '':
        ob.on_next(datetime.datetime.now().time())
    else:
        ob.on_completed()
        break

2.2、操作数据流

如果需要对一些数据进行操作,那么同样有一大堆操作符可以满足需求。当然这部分功能并不是Reactive X独有的,如果你对Java 8的流类库有所了解,会发现这两者这方面的功能几乎是完全一样的。

下面是个简单的例子,将两个数据源结合起来,然后找出来其中所有的偶数。

import rx
from rx import operators as op
from rx.subject import Subject
import datetime

# 操作数据流
some_data = rx.of(1, 2, 3, 4, 5, 6, 7, 8)
some_data2 = rx.from_iterable(range(10, 20))
some_data.pipe(
    op.merge(some_data2),
    op.filter(lambda i: i % 2 == 0),
    # op.map(lambda i: i * 2)
).subscribe(lambda i: print(i))

再或者一个利用reduce的简单例子,求1-100的整数和。

import rx
from rx import operators as op
from rx.subject import Subject
import datetime

rx.range(1, 101).pipe(
    op.reduce(lambda acc, i: acc + i, 0)
).subscribe(lambda i: print(i))

以上就是浅谈Python响应式类库RxPy的详细内容,更多关于Python响应式类库RxPy的资料请关注猪先飞其它相关文章!

[!--infotagslink--]

相关文章

  • python opencv 画外接矩形框的完整代码

    这篇文章主要介绍了python-opencv-画外接矩形框的实例代码,代码简单易懂,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下...2021-09-04
  • Python astype(np.float)函数使用方法解析

    这篇文章主要介绍了Python astype(np.float)函数使用方法解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下...2020-06-08
  • 最炫Python烟花代码全解析

    2022虎年新年即将来临,小编为大家带来了一个利用Python编写的虎年烟花特效,堪称全网最绚烂,文中的示例代码简洁易懂,感兴趣的同学可以动手试一试...2022-02-14
  • python中numpy.empty()函数实例讲解

    在本篇文章里小编给大家分享的是一篇关于python中numpy.empty()函数实例讲解内容,对此有兴趣的朋友们可以学习下。...2021-02-06
  • python-for x in range的用法(注意要点、细节)

    这篇文章主要介绍了python-for x in range的用法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2021-05-10
  • Python 图片转数组,二进制互转操作

    这篇文章主要介绍了Python 图片转数组,二进制互转操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2021-03-09
  • Python中的imread()函数用法说明

    这篇文章主要介绍了Python中的imread()函数用法说明,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2021-03-16
  • python实现b站直播自动发送弹幕功能

    这篇文章主要介绍了python如何实现b站直播自动发送弹幕,帮助大家更好的理解和学习使用python,感兴趣的朋友可以了解下...2021-02-20
  • python Matplotlib基础--如何添加文本和标注

    这篇文章主要介绍了python Matplotlib基础--如何添加文本和标注,帮助大家更好的利用Matplotlib绘制图表,感兴趣的朋友可以了解下...2021-01-26
  • 解决python 使用openpyxl读写大文件的坑

    这篇文章主要介绍了解决python 使用openpyxl读写大文件的坑,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2021-03-13
  • python 计算方位角实例(根据两点的坐标计算)

    今天小编就为大家分享一篇python 计算方位角实例(根据两点的坐标计算),具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2020-04-27
  • python实现双色球随机选号

    这篇文章主要为大家详细介绍了python实现双色球随机选号,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...2020-05-02
  • python中使用np.delete()的实例方法

    在本篇文章里小编给大家整理的是一篇关于python中使用np.delete()的实例方法,对此有兴趣的朋友们可以学习参考下。...2021-02-01
  • 使用Python的pencolor函数实现渐变色功能

    这篇文章主要介绍了使用Python的pencolor函数实现渐变色功能,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下...2021-03-09
  • Python getsizeof()和getsize()区分详解

    这篇文章主要介绍了Python getsizeof()和getsize()区分详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2020-11-20
  • python自动化办公操作PPT的实现

    这篇文章主要介绍了python自动化办公操作PPT的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2021-02-05
  • 解决python 两个时间戳相减出现结果错误的问题

    这篇文章主要介绍了解决python 两个时间戳相减出现结果错误的问题,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2021-03-12
  • python实现学生通讯录管理系统

    这篇文章主要为大家详细介绍了python实现学生通讯录管理系统,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...2021-02-25
  • PyTorch一小时掌握之迁移学习篇

    这篇文章主要介绍了PyTorch一小时掌握之迁移学习篇,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下...2021-09-08
  • Python绘制的爱心树与表白代码(完整代码)

    这篇文章主要介绍了Python绘制的爱心树与表白代码,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下...2021-04-06