上篇文章简单介绍了multiprocessing模块,本文将要介绍进程之间的数据共享和信息传递的概念。
创新互联是一家专业提供卫东企业网站建设,专注与做网站、成都做网站、H5页面制作、小程序制作等业务。10年已为卫东众多企业、政府机构等服务。创新互联专业网站设计公司优惠进行中。
在多进程处理中,所有新创建的进程都会有这两个特点:独立运行,有自己的内存空间。
我们来举个例子展示一下:
这个程序的输出结果是:
在上面的程序中我们尝试在两个地方打印全局列表result的内容:
我们再用一张图来帮助理解记忆不同进程间的数据关系:
如果程序需要在不同的进程之间共享一些数据的话,该怎么做呢?不用担心,multiprocessing模块提供了Array对象和Value对象,用来在进程之间共享数据。
所谓Array对象和Value对象分别是指从共享内存中分配的ctypes数组和对象。我们直接来看一个例子,展示如何用Array对象和Value对象在进程之间共享数据:
程序输出的结果如下:
成功了!主程序和p1进程输出了同样的结果,说明程序中确实完成了不同进程间的数据共享。那么我们来详细看一下上面的程序做了什么:
在主程序中我们首先创建了一个Array对象:
向这个对象输入的第一个参数是数据类型:i表示整数,d代表浮点数。第二个参数是数组的大小,在这个例子中我们创建了包含4个元素的数组。
类似的,我们创建了一个Value对象:
我们只对Value对象输入了一个参数,那就是数据类型,与上述的方法一致。当然,我们还可以对其指定一个初始值(比如10),就像这样:
随后,我们在创建进程对象时,将刚创建好的两个对象:result和square_sum作为参数输入给进程:
在函数中result元素通过索引进行数组赋值,square_sum通过 value 属性进行赋值。
注意:为了完整打印result数组的结果,需要使用 result[:] 进行打印,而square_sum也需要使用 value 属性进行打印:
每当python程序启动时,同时也会启动一个服务器进程。随后,只要我们需要生成一个新进程,父进程就会连接到服务器并请求它派生一个新进程。这个服务器进程可以保存Python对象,并允许其他进程使用代理来操作它们。
multiprocessing模块提供了能够控制服务器进程的Manager类。所以,Manager类也提供了一种创建可以在不同流程之间共享的数据的方法。
服务器进程管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型,如列表、字典、队列、值、数组等。此外,单个管理器可以由网络上不同计算机上的进程共享。
但是,服务器进程管理器的速度比使用共享内存要慢。
让我们来看一个例子:
这个程序的输出结果是:
我们来理解一下这个程序做了什么:首先我们创建了一个manager对象
在with语句下的所有行,都是在manager对象的范围内的。接下来我们使用这个manager对象创建了列表(类似的,我们还可以用 manager.dict() 创建字典)。
最后我们创建了进程p1(用于在records列表中插入一条新的record)和p2(将records打印出来),并将records作为参数进行传递。
服务器进程的概念再次用下图总结一下:
为了能使多个流程能够正常工作,常常需要在它们之间进行一些通信,以便能够划分工作并汇总最后的结果。multiprocessing模块支持进程之间的两种通信通道:Queue和Pipe。
使用队列来回处理多进程之间的通信是一种比较简单的方法。任何Python对象都可以使用队列进行传递。我们来看一个例子:
上面这个程序的输出结果是:
我们来看一下上面这个程序到底做了什么。首先我们创建了一个Queue对象:
然后,将这个空的Queue对象输入square_list函数。该函数会将列表中的数平方,再使用 put() 方法放入队列中:
随后使用 get() 方法,将q打印出来,直至q重新称为一个空的Queue对象:
我们还是用一张图来帮助理解记忆:
一个Pipe对象只能有两个端点。因此,当进程只需要双向通信时,它会比Queue对象更好用。
multiprocessing模块提供了 Pipe() 函数,该函数返回由管道连接的一对连接对象。 Pipe() 返回的两个连接对象分别表示管道的两端。每个连接对象都有 send() 和 recv() 方法。
我们来看一个例子:
上面这个程序的输出结果是:
我们还是来看一下这个程序到底做了什么。首先创建了一个Pipe对象:
与上文说的一样,该对象返回了一对管道两端的两个连接对象。然后使用 send() 方法和 recv() 方法进行信息的传递。就这么简单。在上面的程序中,我们从一端向另一端发送一串消息。在另一端,我们收到消息,并在收到END消息时退出。
要注意的是,如果两个进程(或线程)同时尝试从管道的同一端读取或写入管道中的数据,则管道中的数据可能会损坏。不过不同的进程同时使用管道的两端是没有问题的。还要注意,Queue对象在进程之间进行了适当的同步,但代价是增加了计算复杂度。因此,Queue对象对于线程和进程是相对安全的。
最后我们还是用一张图来示意:
Python的multiprocessing模块还剩最后一篇文章:多进程的同步与池化
敬请期待啦!
日常工作中需要大量、频繁地使用ssh到服务器查看、拉取相关的信息或者对服务器进行变更。目前公司大量使用的shell,但是随着逻辑的复杂化、脚本管理的精细化,shell已经不满足日常需求,于是我尝试整合工作中的需求,制作适合的工具。 由于管理制度的缺陷,我以工作流程为核心思考适合自己的运维方式,提升工作效率,把时间留给更有价值的事情。 完整代码在最后,请大家参考。
生产:4000+物理服务器,近 3000 台虚拟机。
开发环境:python3.6、redhat7.9,除了paramiko为第三方模块需要自己安装,其他的直接import即可。
批量执行操作是一把双刃剑。批量执行操作可以提升工作效率,但是随之而来的风险不可忽略。
风险案例如下:
挂载很多数据盘,通常先格式化硬盘,再挂载数据盘,最后再写入将开机挂载信息写入/etc/fstab文件。在批量lsblk检查硬盘信息的时候发现有的系统盘在/sda有的在/sdm,如果不事先检查机器相关配置是否一致直接按照工作经验去执行批量操作,会很容易造成个人难以承受的灾难。
在执行批量操作时按照惯例:格式化硬盘-挂载-开机挂载的顺序去执行,假设有的机器因为某些故障导致格式化硬盘没法正确执行。在处理这类问题的时候通常会先提取出失败的ip,并再按照惯例执行操作。运维人员会很容易忽略开机挂载的信息已经写过了,导致复写(这都是血和泪的教训)。
所以,为了避免故障,提升工作效率,我认为应当建立团队在工作上的共识,应当遵守以下原则:
当然,代码的规范也应当重视起来,不仅是为了便于审计,同时也需要便于溯源。我认为应当注意以下几点:
1、ssh no existing session,sftp超时时间设置:
在代码无错的情况下大量ip出现No existing session,排查后定位在代码的写法上,下面是一个正确的示例。由于最开始没考虑到ssh连接的几种情况导致了重写好几遍。另外sftp的实例貌似不能直接设置连接超时时间,所以我采用了先建立ssh连接再打开sftp的方法。
2、sftp中的get()和put()方法仅能传文件,不支持直接传目录:
不能直接传目录,那换个思路,遍历路径中的目录和文件,先创建目录再传文件就能达到一样的效果了。在paramiko的sftp中s方法可以获取远程路径中的文件、目录信息。那么我们可以写一个递归来遍历远程路径中的所有文件和目录(传入一个列表是为了接收递归返回的值)。
python自带的os模块中的os.walk()方法可以遍历到本地路径中的目录和文件。
3、多线程多个ip使用s方法时无法并发。
改成多进程即可。
4、多个ip需要执行相同命令或不同的命令。
由于是日常使用的场景不会很复杂,所以借鉴了ansible的playbook,读取提前准备好的配置文件即可,然后再整合到之前定义的ssh函数中。
同时,我们还衍生出一个需求,既然都要读取配置,那同样也可以提前把ip地址准备在文件里。正好也能读取我们返回的执行程序的结果。
参数说明:
密码认证:
公钥认证:
可以配合 grep,awk 等命令精准过滤。
个人认为 Python 在初中级运维工作中的性质更像是工具,以提升工作效率、减少管理成本为主。可以从当前繁琐的工作中解脱出来,去 探索 更有价值的事情。python 本质上并不会减少故障的产生,所以在不同的阶段合理利用自身掌握的知识解决当前最重要的痛点,千万不要本末倒置。
.put 不清楚
str() 数据类型转换,转换成字符串。如:str(123)='123'
Queue 叫队列,是数据结构中的一种,基本上所有成熟的编程语言都内置了对 Queue 的支持。
Python 中的 Queue 模块实现了多生产者和多消费者模型,当需要在多线程编程中非常实用。而且该模块中的 Queue 类实现了锁原语,不需要再考虑多线程安全问题。
该模块内置了三种类型的 Queue,分别是 class queue.Queue(maxsize=0) , class queue.LifoQueue(maxsize=0) 和 class queue.PriorityQueue(maxsize=0) 。它们三个的区别仅仅是取出时的顺序不一致而已。
Queue 是一个 FIFO 队列,任务按照添加的顺序被取出。
LifoQueue 是一个 LIFO 队列,类似堆栈,后添加的任务先被取出。
PriorityQueue 是一个优先级队列,队列里面的任务按照优先级排序,优先级高的先被取出。
如你所见,就是上面所说的三种不同类型的内置队列,其中 maxsize 是个整数,用于设置可以放入队列中的任务数的上限。当达到这个大小的时候,插入操作将阻塞至队列中的任务被消费掉。如果 maxsize 小于等于零,则队列尺寸为无限大。
向队列中添加任务,直接调用 put() 函数即可
put() 函数完整的函数签名如下 Queue.put(item, block=True, timeout=None) ,如你所见,该函数有两个可选参数。
默认情况下,在队列满时,该函数会一直阻塞,直到队列中有空余的位置可以添加任务为止。如果 timeout 是正数,则最多阻塞 timeout 秒,如果这段时间内还没有空余的位置出来,则会引发 Full 异常。
当 block 为 false 时,timeout 参数将失效。同时如果队列中没有空余的位置可添加任务则会引发 Full 异常,否则会直接把任务放入队列并返回,不会阻塞。
另外,还可以通过 Queue.put_nowait(item) 来添加任务,相当于 Queue.put(item, False) ,不再赘述。同样,在队列满时,该操作会引发 Full 异常。
从队列中获取任务,直接调用 get() 函数即可。
与 put() 函数一样, get() 函数也有两个可选参数,完整签名如下 Queue.get(block=True, timeout=None) 。
默认情况下,当队列空时调用该函数会一直阻塞,直到队列中有任务可获取为止。如果 timeout 是正数,则最多阻塞 timeout 秒,如果这段时间内还没有任务可获取,则会引发 Empty 异常。
当 block 为 false 时,timeout 参数将失效。同时如果队列中没有任务可获取则会立刻引发 Empty 异常,否则会直接获取一个任务并返回,不会阻塞。
另外,还可以通过 Queue.get_nowait() 来获取任务,相当于 Queue.get(False) ,不再赘述。同样,在队列为空时,该操作会引发 Empty 异常。
Queue.qsize() 函数返回队列的大小。注意这个大小不是精确的,qsize() 0 不保证后续的 get() 不被阻塞,同样 qsize() maxsize 也不保证 put() 不被阻塞。
如果队列为空,返回 True ,否则返回 False 。如果 empty() 返回 True ,不保证后续调用的 put() 不被阻塞。类似的,如果 empty() 返回 False ,也不保证后续调用的 get() 不被阻塞。
如果队列是满的返回 True ,否则返回 False 。如果 full() 返回 True 不保证后续调用的 get() 不被阻塞。类似的,如果 full() 返回 False 也不保证后续调用的 put() 不被阻塞。
queue.Queue() 是 FIFO 队列,出队顺序跟入队顺序是一致的。
queue.LifoQueue() 是 LIFO 队列,出队顺序跟入队顺序是完全相反的,类似于栈。
优先级队列中的任务顺序跟放入时的顺序是无关的,而是按照任务的大小来排序,最小值先被取出。那任务比较大小的规则是怎么样的呢。
注意,因为列表的比较对规则是按照下标顺序来比较的,所以在没有比较出大小之前 ,队列中所有列表对应下标位置的元素类型要一致。
好比 [2,1] 和 ["1","b"] 因为第一个位置的元素类型不一样,所以是没有办法比较大小的,所以也就放入不了优先级队列。
然而对于 [2,1] 和 [1,"b"] 来说即使第二个元素的类型不一致也是可以放入优先级队列的,因为只需要比较第一个位置元素的大小就可以比较出结果了,就不需要比较第二个位置元素的大小了。
但是对于 [2,1] 和 1 [2,"b"] 来说,则同样不可以放入优先级队列,因为需要比较第二个位置的元素才可以比较出结果,然而第二个位置的元素类型是不一致的,无法比较大小。
综上,也就是说, 直到在比较出结果之前,对应下标位置的元素类型都是需要一致的 。
下面我们自定义一个动物类型,希望按照年龄大小来做优先级排序。年龄越小优先级越高。
本章节介绍了队列以及其常用操作。因为队列默认实现了锁原语,因此在多线程编程中就不需要再考虑多线程安全问题了,对于程序员来说相当友好了。