1. 现象
以数据管道 (类似 Unix 管道) 的方式迭代处理数据。比如,有个大量的数据需要处理,但是不能将它们一次性放入内存中
2. 原因分析&基础知识
大量数据处理时,不建议一次性放到内存中,而是应该分批次存储
os.walk函数
os.walk(top, topdown=True, onerror=None, followlinks=False)
- top -- 所要遍历的目录的地址, 返回的是一个三元组(dirpath, dirnames, filenames)。dirpath 所指的是当前正在遍历的这个文件夹的本身的地址dirnames 是一个 list ,内容是dirpath该文件夹中所有的目录的名字(不包括子目录)filenames 同样是 list , 内容是dirpath该文件夹中所有的文件(不包括子目录)
- topdown --可选,为 True,则优先遍历 top 目录,否则优先遍历 top 的子目录(默认为开启)。如果 topdown 参数为 True,walk 会遍历top文件夹,与top 文件夹中每一个子目录。
- onerror -- 可选,需要一个 callable 对象,当 walk 需要异常时,会调用。
- followlinks -- 可选,如果为 True,则会遍历目录下的快捷方式(linux 下是软连接 symbolic link )实际所指的目录(默认关闭),如果为 False,则优先遍历 top 的子目录。
fnmatch.filter函数
fnmatch.filter(names, pattern)
文件名称的匹配,并且匹配的模式使用的unix shell风格通配符
- names -- 需要匹配的文件名列表
- pattern -- 通配符表达式
gzip.open函数
gzip.open(filename, mode='rb', compresslevel=9, encoding=None, errors=None, newline=None)
以二进制或者文本模式打开gzip压缩文件,返回文件对象
- flename -- 文件
- mode -- 打开模式,'r','rb', 'a', 'ab', 'w', 'wb', 'x' 或者'xb'都是二进制模式,'rt', 'at', 'wt', 'xt'文本模式,默认'rb'
- compresslevel -- 压缩等级,0-9等级,0表示不压缩,1表示最快压缩但是压缩最少,9表示最慢,但是压缩最大
- encoding -- 编码方式,只针对文本模式
- errors -- 当编码解码报错后处理方式
- newline -- 行结尾的处理方法,值可选None, '', '\n', '\r'或'\r\n'
bz2.open函数
bz2.open(filename, mode='r', compresslevel=9, encoding=None, errors=None, newline=None)
以二进制或者文本模式打开bzip2压缩文件,返回文件对象
与gzip参数相同
yield
yield 的作用就是把一个函数变成一个 generator 生成器。执行带yield的函数,结果会返回一个可迭代对象
yield from
yield from 和yield作用结果相同,只是from需要跟上一个可迭代对象
for item in iterators:
yield item
# 相当于
yield from iterators
3. 问题解决
生成器函数yield是一个实现管道机制的好办法。
为了演示,假定你要处理一个非常大的Centos下的日志文件目录/var/log
为了处理这些文件,可以定义一个由多个执行特定任务的简单生成器函数组成的容器,如:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : cory
# @Time : 2021/9/2922:09
# @Email: **
import bz2
import fnmatch
import gzip
import itertools
import os
import re
def gen_find(file_pattern, top):
"""
找到所有符合Unix通配符表达式的文件
:param file_pattern: 通配符表达式
:param top: 搜索的目录
:return: 迭代对象
"""
for root, dirs, files in os.walk(top, topdown=False):
for name in fnmatch.filter(files, file_pattern):
yield os.path.join(root, name)
def gen_opener(filenames):
"""
# 如果以bz2结尾:f = bz2.open(filename, 'rt')
:param filenames: 文件
:return: 返回文件对象
"""
for filename in filenames:
if filename.endswith('.gz'):
f = gzip.open(filename, 'rt')
else:
f = open(filename, 'rt')
yield f
f.close()
def gen_concatenate(iterators):
"""
:param iterators: 可迭代对象
:return:
"""
for it in iterators:
yield from it
def gen_grep(pattern, lines):
"""
返回匹配正则表达式的行
:param pattern: 正则表达式
:param lines: 预匹配的行
:return: 迭代行
"""
pat = re.compile(pattern)
for line in lines:
if pat.search(line):
yield line
if __name__ == '__main__':
sssd_files = gen_find('sssd*', '/var/log')
file_objs = gen_opener(sssd_files)
# lines_con = itertools.chain(*file_objs)
lines_con = gen_concatenate(file_objs)
services = gen_grep(r'(?i)service', lines_con)
for line_service in services:
print(line_service)
# 查找sssd 守护进程中带service字样的内容
以管道方式处理数据可以用来解决各类其他问题,包括解析,读取实时数据,定时轮询等。
为了理解上述代码,重点是要明白 yield 语句作为数据的生产者而 for 循环语句作为数据的消费者。当这些生成器被连在一起后,每个 yield 会将一个单独的数据元素传递给迭代处理管道的下一阶段
上述代码即便是在一个超大型文件目录中也能工作的很好。事实上,由于使用了迭代方式处理,代码运行过程中只需要很小很小的内存。
在调用 gen_concatenate() 函数是将输入序列拼接成一个很长的行序列。 itertools.chain函数同样有类似的功能,但是它需要将所有可迭代对象作为参数传入。如:linescon = itertools.chain(*fileobjs) 。使得gen_opener生成器都被消费掉了。由于 gen_opener() 生成器每次生成一个打开过的文件,等到下一个迭代步骤时文件就关闭了,因此 chain 在这里不能这样使用。上面的方案可以避免这种情况
如果使用chain函数,报错如下:
lines_con = itertools.chain(*file_objs)