长亭百川云 - 文章详情

Python中实现轮询raw_input和手机扫码动作

李姐姐的扫描器

47

2024-07-13

下午,同事问我:有办法终止一个阻塞的raw_input吗?

他希望:用户既可以在终端标准输入,也可以选择外部手机扫码输入。任意条件满足的情况下,程序都能继续向后执行。 

但是,上述raw_input会阻塞,导致不能检查是否有成功扫码。

为了得到想要的效果,我写了一个示例程序,方法如下:

  • 从终端读取标准输入的阻塞操作,必然不能在原先的函数中执行,交由子进程处理

  • 扫码确认的动作,交给1个单独线程轮询处理,这里,我换成了本地文件读取演示

`# -*- coding: utf-8 -*-``import threading``import sys``import os``import time``import multiprocessing``   ``   ``def get_input(file_no, queue_motp):`    `stdin_obj = os.fdopen(file_no)`    `code = stdin_obj.readline().strip()`    `queue_motp.put(code)`    `stdin_obj.close()``   ``   ``class GetMOTP(threading.Thread):`    `def __init__(self):`        `threading.Thread.__init__(self)``   ``   `    `def run(self):`        `global flag`        `self.motp_input_done = False`        `sys.stdout.write("motp>")`        `manager = multiprocessing.Manager()`        `self.queue_motp = queue_motp = manager.Queue()``   `        `p = multiprocessing.Process(target=get_input, args=(sys.stdin.fileno(), queue_motp))    # 读取标准输入`        `p.start()`        `threading.Thread(target=self.check_from_disk).start()    # 本地读取`        `while queue_motp.empty():`            `time.sleep(0.1)`        `self.motp_input_done = True`        `p.terminate()`        `motp = queue_motp.get_nowait()`        `print("motp: ", motp)`        `if motp == "good":`            `flag = True``   `    `# 模拟扫码逻辑`    `def check_from_disk(self):`        `while not self.motp_input_done:`            `if os.path.exists("motp.txt"):`                `with open("motp.txt", "r") as f:`                    `code = f.read()`                    `if code.strip():`                        `self.queue_motp.put(code.strip())`                        `return True`            `time.sleep(0.2)``   ``   ``if __name__ == '__main__':`    `GetMOTP().start()`

以上,无论是用户在终端输入,还是扫码轮询成功,都可以取到输入,并同时终止2类等待,继续执行后续的程序逻辑。

相关推荐
关注或联系我们
添加百川云公众号,移动管理云安全产品
咨询热线:
4000-327-707
百川公众号
百川公众号
百川云客服
百川云客服

Copyright ©2024 北京长亭科技有限公司
icon
京ICP备 2024055124号-2