Python装饰器限制函数运行时间,超时退出
563 字约 2 分钟
2024-12-01
方法一、使用 signal
import signal
import time
def set_timeout(num, callback):
def wrap(func):
def handle(signum, frame): # 收到信号 SIGALRM 后的回调函数,第一个参数是信号的数字,第二个参数是the interrupted stack frame.
raise RuntimeError
def to_do(*args, **kwargs):
try:
signal.signal(signal.SIGALRM, handle) # 设置信号和回调函数
signal.alarm(num) # 设置 num 秒的闹钟
print('start alarm signal.')
r = func(*args, **kwargs)
print('close alarm signal.')
signal.alarm(0) # 关闭闹钟
return r
except RuntimeError as e:
callback()
return to_do
return wrap
def after_timeout(): # 超时后的处理函数
print("Time out!")
@set_timeout(2, after_timeout) # 限时 2 秒超时
def connect(): # 要执行的函数
time.sleep(3) # 函数执行时间,写大于2的值,可测试超时
print('Finished without timeout.')
if __name__ == '__main__':
connect()
注意:使用的signal有所限制,需要在linux系统上,并且需要在主线程中使用,否则无效
方法二. 使用Thread
from time import sleep, time
import sys, threading
class KThread(threading.Thread):
"""A subclass of threading.Thread, with a kill()
method.
Come from:
Kill a thread in Python:
http://mail.python.org/pipermail/python-list/2004-May/260937.html
"""
def __init__(self, *args, **kwargs):
threading.Thread.__init__(self, *args, **kwargs)
self.killed = False
def start(self):
"""Start the thread."""
self.__run_backup = self.run
self.run = self.__run # Force the Thread to install our trace.
threading.Thread.start(self)
def __run(self):
"""Hacked run function, which installs the
trace."""
sys.settrace(self.globaltrace)
self.__run_backup()
self.run = self.__run_backup
def globaltrace(self, frame, why, arg):
if why == 'call':
return self.localtrace
else:
return None
def localtrace(self, frame, why, arg):
if self.killed:
if why == 'line':
raise SystemExit()
return self.localtrace
def kill(self):
self.killed = True
class Timeout(Exception):
"""function run timeout"""
def set_timeout(seconds, callback_data):
"""超时装饰器,指定超时时间
若被装饰的方法在指定的时间内未返回,则抛出Timeout异常"""
def timeout_decorator(func):
"""真正的装饰器"""
def _new_func(oldfunc, result, oldfunc_args, oldfunc_kwargs):
result.append(oldfunc(*oldfunc_args, **oldfunc_kwargs))
def _(*args, **kwargs):
result = []
new_kwargs = { # create new args for _new_func, because we want to get the func return val to result list
'oldfunc': func,
'result': result,
'oldfunc_args': args,
'oldfunc_kwargs': kwargs
}
thd = KThread(target=_new_func, args=(), kwargs=new_kwargs)
thd.start()
thd.join(seconds)
alive = thd.isAlive()
thd.kill() # kill the child thread
if alive:
# raise Timeout(u'function run too long, timeout %d seconds.' % seconds)
try:
return callback_data
# raise Timeout(u'function run too long, timeout %d seconds.' % seconds)
finally:
# return u'function run too long, timeout %d seconds.' % seconds
return callback_data
else:
return result[0]
_.__name__ = func.__name__
_.__doc__ = func.__doc__
return _
return timeout_decorator
@set_timeout(2, None) # 限时 2 秒超时
def connect(): # 要执行的函数
time.sleep(3) # 函数执行时间,写大于2的值,可测试超时
print('Finished without timeout.')
if __name__ == '__main__':
connect()
方法二使用线程的版本可以支持多线程