Project Icon

Pipe

Python数据处理的中缀编程工具包

Pipe是一个Python模块,提供类似Shell的中缀语法编程能力。通过管道操作符'|'连接数据处理函数,实现高效的数据处理流程。该工具支持惰性求值、函数别名和部分初始化,内置多种数据操作函数如过滤、映射、分组等。Pipe允许创建自定义管道函数,适用于处理大型数据集和复杂数据流,有助于提高代码可读性和可维护性。

Pipe — 中缀编程工具包

PyPI 每月下载量 支持的Python版本 GitHub工作流状态

启用类似sh的中缀语法(使用管道)的模块。

简介

例如,这是欧拉计划第2题的解决方案:

找出斐波那契数列中不超过四百万的所有偶数项之和。

假设fib是一个斐波那契数生成器:

sum(fib() | where(lambda x: x % 2 == 0) | take_while(lambda x: x < 4000000))

每个管道都是惰性求值的,可以被别名化,并且可以部分初始化,所以它可以重写为:

is_even = where(lambda x: x % 2 == 0)
sum(fib() | is_even | take_while(lambda x: x < 4000000)

安装

要安装这个库,你只需运行以下命令:

# Linux/macOS
python3 -m pip install pipe

# Windows
py -3 -m pip install pipe

使用

基本语法是像在shell中一样使用|

>>> from itertools import count
>>> from pipe import select, take
>>> sum(count() | select(lambda x: x ** 2) | take(10))
285
>>>

一些管道需要一个参数:

>>> from pipe import where
>>> sum([1, 2, 3, 4] | where(lambda x: x % 2 == 0))
6
>>>

有些不需要参数:

>>> from pipe import traverse
>>> for i in [1, [2, 3], 4] | traverse:
...     print(i)
1
2
3
4
>>>

在这种情况下,允许使用调用括号:

>>> from pipe import traverse
>>> for i in [1, [2, 3], 4] | traverse():
...     print(i)
1
2
3
4
>>>

本模块中现有的管道

按字母顺序列出可用的管道;当一个管道有多个名称时,这些是别名。

batched

类似Python 3.12的itertool.batched

>>> from pipe import batched
>>> list("ABCDEFG" | batched(3))
[('A', 'B', 'C'), ('D', 'E', 'F'), ('G',)]
>>>

chain

链接一系列可迭代对象:

>>> from pipe import chain
>>> list([[1, 2], [3, 4], [5]] | chain)
[1, 2, 3, 4, 5]
>>>

警告:chain只展开仅包含可迭代对象的可迭代对象:

list([1, 2, [3]] | chain)

会引发TypeError: 'int' object is not iterable 考虑使用traverse。

chain_with(other)

类似itertools.chain,先产生给定可迭代对象的元素,然后产生其参数的元素

>>> from pipe import chain_with
>>> list((1, 2, 3) | chain_with([4, 5], [6]))
[1, 2, 3, 4, 5, 6]
>>>

dedup(key=None)

去除重复值,如果提供了key函数则使用它。

>>> from pipe import dedup
>>> list([-1, 0, 0, 0, 1, 2, 3] | dedup)
[-1, 0, 1, 2, 3]
>>> list([-1, 0, 0, 0, 1, 2, 3] | dedup(key=abs))
[-1, 0, 2, 3]
>>>

enumerate(start=0)

内置的enumerate()作为一个Pipe:

>>> from pipe import enumerate
>>> list(['apple', 'banana', 'citron'] | enumerate)
[(0, 'apple'), (1, 'banana'), (2, 'citron')]
>>> list(['car', 'truck', 'motorcycle', 'bus', 'train'] | enumerate(start=6))
[(6, 'car'), (7, 'truck'), (8, 'motorcycle'), (9, 'bus'), (10, 'train')]
>>>

filter(predicate)

where(predicate)的别名,见where(predicate)

groupby(key=None)

类似itertools.groupby(sorted(iterable, key = keyfunc), keyfunc)

>>> from pipe import groupby, map
>>> items = range(10)
>>> ' / '.join(items | groupby(lambda x: "Odd" if x % 2 else "Even")
...                  | select(lambda x: "{}: {}".format(x[0], ', '.join(x[1] | map(str)))))
'Even: 0, 2, 4, 6, 8 / Odd: 1, 3, 5, 7, 9'
>>>

islice()

就是itertools.islice函数作为一个Pipe:

>>> from pipe import islice
>>> list((1, 2, 3, 4, 5, 6, 7, 8, 9) | islice(2, 8, 2))
[3, 5, 7]
>>>

izip()

就是itertools.izip函数作为一个Pipe:

>>> from pipe import izip
>>> list(range(0, 10) | izip(range(1, 11)))
[(0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7), (7, 8), (8, 9), (9, 10)]
>>>

map(), select()

将作为参数给出的转换表达式应用于给定可迭代对象的每个元素

>>> list([1, 2, 3] | map(lambda x: x * x))
[1, 4, 9]

>>> list([1, 2, 3] | select(lambda x: x * x))
[1, 4, 9]
>>>

netcat

netcat Pipe通过TCP发送和接收字节:

data = [
    b"HEAD / HTTP/1.0\r\n",
    b"Host: python.org\r\n",
    b"\r\n",
]
for packet in data | netcat("python.org", 80):
    print(packet.decode("UTF-8"))

输出:

HTTP/1.1 301 Moved Permanently
Content-length: 0
Location: https://python.org/
Connection: close

permutations(r=None)

返回所有可能的排列:

>>> from pipe import permutations
>>> for item in 'ABC' | permutations(2):
...     print(item)
('A', 'B')
('A', 'C')
('B', 'A')
('B', 'C')
('C', 'A')
('C', 'B')
>>>
>>> for item in range(3) | permutations:
...     print(item)
(0, 1, 2)
(0, 2, 1)
(1, 0, 2)
(1, 2, 0)
(2, 0, 1)
(2, 1, 0)
>>>

reverse

类似Python的内置reversed函数。

>>> from pipe import reverse
>>> list([1, 2, 3] | reverse)
[3, 2, 1]
>>>

select(fct)

map(fct)的别名,见map(fct)

skip()

从给定的可迭代对象中跳过给定数量的元素,然后产生剩余元素

>>> from pipe import skip
>>> list((1, 2, 3, 4, 5) | skip(2))
[3, 4, 5]
>>>

skip_while(predicate)

类似itertools.dropwhile,当谓词为真时跳过给定可迭代对象的元素,然后产生其他元素:

>>> from pipe import skip_while
>>> list([1, 2, 3, 4] | skip_while(lambda x: x < 3))
[3, 4]
>>>

sort(key=None, reverse=False)

类似Python的内置"sorted"原语。

>>> from pipe import sort
>>> ''.join("python" | sort)
'hnopty'
>>> [5, -4, 3, -2, 1] | sort(key=abs)
[1, -2, 3, -4, 5]
>>>

t

类似Haskell的操作符":":

>>> from pipe import t
>>> for i in 0 | t(1) | t(2):
...     print(i)
0
1
2
>>>

tail(n)

产生给定可迭代对象的最后n个元素。

>>> from pipe import tail
>>> for i in (1, 2, 3, 4, 5) | tail(3):
...     print(i)
3
4
5
>>>

take(n)

从给定的可迭代对象中产生给定数量的元素,类似shell脚本中的head

>>> from pipe import take
>>> for i in count() | take(5):
...     print(i)
0
1
2
3
4
>>>

take_while(predicate)

类似itertools.takewhile,当谓词为真时产生给定可迭代对象的元素:

>>> from pipe import take_while
>>> for i in count() | take_while(lambda x: x ** 2 < 100):
...     print(i)
0
1
2
3
4
5
6
7
8
9
>>>

tee

tee 输出到标准输出并产生未更改的项,对于逐步调试管道很有用:

>>> from pipe import tee
>>> sum(["1", "2", "3", "4", "5"] | tee | map(int) | tee)
'1'
1
'2'
2
'3'
3
'4'
4
'5'
5
15
>>>

末尾的 15sum 的返回值。

transpose()

转置矩阵的行和列。

>>> from pipe import transpose
>>> [[1, 2, 3], [4, 5, 6], [7, 8, 9]] | transpose
[(1, 4, 7), (2, 5, 8), (3, 6, 9)]
>>>

traverse

递归展开可迭代对象:

>>> list([[1, 2], [[[3], [[4]]], [5]]] | traverse)
[1, 2, 3, 4, 5]
>>> squares = (i * i for i in range(3))
>>> list([[0, 1, 2], squares] | traverse)
[0, 1, 2, 0, 1, 4]
>>>

uniq(key=None)

类似于 dedup(),但只对连续的值去重,如果提供了 key 函数则使用该函数(否则使用恒等函数)。

>>> from pipe import uniq
>>> list([1, 1, 2, 2, 3, 3, 1, 2, 3] | uniq)
[1, 2, 3, 1, 2, 3]
>>> list([1, -1, 1, 2, -2, 2, 3, 3, 1, 2, 3] | uniq(key=abs))
[1, 2, 3, 1, 2, 3]
>>>

where(predicate), filter(predicate)

只产生给定可迭代对象中匹配的项:

>>> list([1, 2, 3] | where(lambda x: x % 2 == 0))
[2]
>>>

别忘了它们可以被别名:

>>> positive = where(lambda x: x > 0)
>>> negative = where(lambda x: x < 0)
>>> sum([-10, -5, 0, 5, 10] | positive)
15
>>> sum([-10, -5, 0, 5, 10] | negative)
-15
>>>

构建自己的管道

你可以使用 Pipe 类构建自己的管道,像这样:

from pipe import Pipe
square = Pipe(lambda iterable: (x ** 2 for x in iterable))
map = Pipe(lambda iterable, fct: builtins.map(fct, iterable)
>>>

正如你所见,编写通常非常简短,运气好的话,你要包装的函数已经将可迭代对象作为第一个参数,这使得包装变得简单:

>>> from collections import deque
>>> from pipe import Pipe
>>> end = Pipe(deque)
>>>

就这样,itrable | end(3) 就是 deque(iterable, 3):

>>> list(range(100) | end(3))
[97, 98, 99]
>>>

如果情况变得更复杂,可以使用 Pipe 作为装饰器,装饰一个以可迭代对象作为第一个参数,其他可选参数在后面的函数:

>>> from statistics import mean

>>> @Pipe
... def running_average(iterable, width):
...     items = deque(maxlen=width)
...     for item in iterable:
...         items.append(item)
...         yield mean(items)

>>> list(range(20) | running_average(width=2))
[0, 0.5, 1.5, 2.5, 3.5, 4.5, 5.5, 6.5, 7.5, 8.5, 9.5, 10.5, 11.5, 12.5, 13.5, 14.5, 15.5, 16.5, 17.5, 18.5]
>>> list(range(20) | running_average(width=10))
[0, 0.5, 1, 1.5, 2, 2.5, 3, 3.5, 4, 4.5, 5.5, 6.5, 7.5, 8.5, 9.5, 10.5, 11.5, 12.5, 13.5, 14.5]
>>>

一次性管道

有时你只想要一行代码,在创建管道时可以直接指定函数的位置参数和命名参数

>>> from itertools import combinations

>>> list(range(5) | Pipe(combinations, 2))
[(0, 1), (0, 2), (0, 3), (0, 4), (1, 2), (1, 3), (1, 4), (2, 3), (2, 4), (3, 4)]
>>>

带有初始起始值的简单累加和

>>> from itertools import accumulate

>>> list(range(10) | Pipe(accumulate, initial=1))
[1, 1, 2, 4, 7, 11, 16, 22, 29, 37, 46]
>>>

或者根据某些条件过滤数据

>>> from itertools import compress

list(range(20) | Pipe(compress, selectors=[1, 0] * 10))
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
>>> list(range(20) | Pipe(compress, selectors=[0, 1] * 10))
[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
>>>

欧拉项目示例

求1000以下所有3或5的倍数之和。

>>> sum(count() | where(lambda x: x % 3 == 0 or x % 5 == 0) | take_while(lambda x: x < 1000))
233168
>>>

求斐波那契数列中不超过四百万的偶数项之和。

sum(fib() | where(lambda x: x % 2 == 0) | take_while(lambda x: x < 4000000))

求前一百个自然数的平方和与和的平方之差。

>>> square = map(lambda x: x ** 2)
>>> sum(range(101)) ** 2 - sum(range(101) | square)
25164150
>>>

深入探讨

部分管道

可以在不求值的情况下参数化 pipe:

>>> running_average_of_two = running_average(2)
>>> list(range(20) | running_average_of_two)
[0, 0.5, 1.5, 2.5, 3.5, 4.5, 5.5, 6.5, 7.5, 8.5, 9.5, 10.5, 11.5, 12.5, 13.5, 14.5, 15.5, 16.5, 17.5, 18.5]
>>>

对于多参数管道,可以部分初始化,你可以将其视为柯里化:

some_iterable | some_pipe(1, 2, 3)
some_iterable | Pipe(some_func, 1, 2, 3)

严格等同于:

some_iterable | some_pipe(1)(2)(3)

因此可以用来专门化管道,首先是一个简单的例子:

>>> @Pipe
... def addmul(iterable, to_add, to_mul):
...     """对输入的每一项计算 (x + to_add) * to_mul。"""
...     for i in iterable:
...         yield (i + to_add) * to_mul

>>> mul = addmul(0)  # 这部分初始化 addmul,设置 to_add=0
>>> list(range(10) | mul(10))
[0, 10, 20, 30, 40, 50, 60, 70, 80, 90]

这也适用于关键字参数:

>>> add = addmul(to_mul=1)  # 这部分初始化 addmul,设置 `to_mul=1`
>>> list(range(10) | add(10))
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
>>>

现在来看一些有趣的东西:

>>> import re
>>> @Pipe
... def grep(iterable, pattern, flags=0):
...     for line in iterable:
...         if re.match(pattern, line, flags=flags):
...             yield line
...
>>> lines = ["Hello", "hello", "World", "world"]
>>> for line in lines | grep("H"):
...     print(line)
Hello
>>>

现在让我们以两种方式重用它,首先是使用模式:

>>> lowercase_only = grep("[a-z]+$")
>>> for line in lines | lowercase_only:
...     print(line)
hello
world
>>>

或者现在使用标志:

>>> igrep = grep(flags=re.IGNORECASE)
>>> for line in lines | igrep("hello"):
...    print(line)
...
Hello
hello
>>>

惰性求值

Pipe 从头到尾都使用生成器,所以它本质上是惰性的。

在以下示例中,我们将使用 itertools.count:一个无限整数生成器。 我们还将使用tee管道,它会打印通过它的每个值。

以下示例什么也不做,tee不会打印任何内容,因此没有值通过它。这很好,因为生成无限平方序列是"缓慢"的。

>>> result = count() | tee | select(lambda x: x ** 2)
>>>

链接更多管道仍然不会使前面的管道开始生成值,在下面的示例中,没有一个值从count中被提取出来:

>>> result = count() | tee | select(lambda x: x ** 2)
>>> first_results = result | take(10)
>>> only_odd_ones = first_results | where(lambda x: x % 2)
>>>

不使用变量的相同示例:

>>> result = (count() | tee
...                   | select(lambda x: x ** 2)
...                   | take(10)
...                   | where(lambda x: x % 2))
>>>

只有当实际需要值时,生成器才开始工作。

在以下示例中,只会从count中提取两个值:

  • 0被平方(变成0),轻易通过take(10),但被where丢弃
  • 1被平方(变成1),也轻易通过take(10),通过where,并通过take(1)

此时take(1)已满足,所以不需要进行其他计算。注意tee打印通过它的01

>>> result = (count() | tee
...                   | select(lambda x: x ** 2)
...                   | take(10)
...                   | where(lambda x: x % 2))
>>> print(list(result | take(1)))
0
1
[1]
>>>

废弃

在pipe 1.x中,许多函数返回可迭代对象,而许多其他函数返回非可迭代对象,这造成了混淆。那些返回非可迭代对象的函数只能用作管道表达式的最后一个函数,所以它们实际上是无用的:

range(100) | where(lambda x: x % 2 == 0) | add

可以改写为同样易读的形式:

sum(range(100) | where(lambda x: x % 2 == 0))

因此,所有返回非可迭代对象的管道都被废弃(发出警告),并最终在pipe 2.0中被移除。

我应该怎么做?

哦,你刚刚升级了pipe,遇到了异常,然后来到这里?你有三个解决方案:

  1. 停止使用闭合管道,将...|...|...|...|as_list替换为list(...|...|...|),就是这样,甚至更短。

  2. 如果"闭合管道"对你来说不是问题,而你真的喜欢它们,只需重新实现你真正需要的几个,通常只需要很少的代码行,或者从这里复制它们。

  3. 如果你仍然依赖很多闭合管道并且时间紧迫,只需pip install pipe<2

并开始使用Python开发模式测试你的项目,这样你就可以在警告变成问题之前捕获它们。

但我喜欢它们,请重新引入它们!

这个问题已经在#74中讨论过了。

一个@Pipe通常可以在1到3行代码的函数中轻松实现,而pipe模块的目标不是提供所有可能性,而是提供Pipe装饰器。

因此,如果你需要更多管道、闭合管道、奇怪的管道等等,可以随意在你的项目中实现它们,并将已实现的管道视为如何实现的示例。

请参阅下面的"构建你自己的管道"段落。

项目侧边栏1项目侧边栏2
推荐项目
Project Cover

豆包MarsCode

豆包 MarsCode 是一款革命性的编程助手,通过AI技术提供代码补全、单测生成、代码解释和智能问答等功能,支持100+编程语言,与主流编辑器无缝集成,显著提升开发效率和代码质量。

Project Cover

AI写歌

Suno AI是一个革命性的AI音乐创作平台,能在短短30秒内帮助用户创作出一首完整的歌曲。无论是寻找创作灵感还是需要快速制作音乐,Suno AI都是音乐爱好者和专业人士的理想选择。

Project Cover

白日梦AI

白日梦AI提供专注于AI视频生成的多样化功能,包括文生视频、动态画面和形象生成等,帮助用户快速上手,创造专业级内容。

Project Cover

有言AI

有言平台提供一站式AIGC视频创作解决方案,通过智能技术简化视频制作流程。无论是企业宣传还是个人分享,有言都能帮助用户快速、轻松地制作出专业级别的视频内容。

Project Cover

Kimi

Kimi AI助手提供多语言对话支持,能够阅读和理解用户上传的文件内容,解析网页信息,并结合搜索结果为用户提供详尽的答案。无论是日常咨询还是专业问题,Kimi都能以友好、专业的方式提供帮助。

Project Cover

讯飞绘镜

讯飞绘镜是一个支持从创意到完整视频创作的智能平台,用户可以快速生成视频素材并创作独特的音乐视频和故事。平台提供多样化的主题和精选作品,帮助用户探索创意灵感。

Project Cover

讯飞文书

讯飞文书依托讯飞星火大模型,为文书写作者提供从素材筹备到稿件撰写及审稿的全程支持。通过录音智记和以稿写稿等功能,满足事务性工作的高频需求,帮助撰稿人节省精力,提高效率,优化工作与生活。

Project Cover

阿里绘蛙

绘蛙是阿里巴巴集团推出的革命性AI电商营销平台。利用尖端人工智能技术,为商家提供一键生成商品图和营销文案的服务,显著提升内容创作效率和营销效果。适用于淘宝、天猫等电商平台,让商品第一时间被种草。

Project Cover

AIWritePaper论文写作

AIWritePaper论文写作是一站式AI论文写作辅助工具,简化了选题、文献检索至论文撰写的整个过程。通过简单设定,平台可快速生成高质量论文大纲和全文,配合图表、参考文献等一应俱全,同时提供开题报告和答辩PPT等增值服务,保障数据安全,有效提升写作效率和论文质量。

投诉举报邮箱: service@vectorlightyear.com
@2024 懂AI·鲁ICP备2024100362号-6·鲁公网安备37021002001498号