@nosklo's twisted-based Solution सुरुचिपूर्ण और व्यावहारिक है, लेकिन यदि आप मुड़ पर निर्भरता से बचना चाहते हैं, तो कार्य अभी भी करने योग्य है, उदा:
import multiprocessing
def query_with_timeout(dbc, timeout, query, *a, **k):
conn1, conn2 = multiprocessing.Pipe(False)
subproc = multiprocessing.Process(target=do_query,
args=(dbc, query, conn2)+a,
kwargs=k)
subproc.start()
subproc.join(timeout)
if conn1.poll():
return conn1.recv()
subproc.terminate()
raise TimeoutError("Query %r ran for >%r" % (query, timeout))
def do_query(dbc, query, conn, *a, **k):
cu = dbc.cursor()
cu.execute(query, *a, **k)
return cu.fetchall()