微信搜索superit|邀请体验:大数据, 数据管理、OLAP分析与可视化平台 | 赞助作者:赞助作者

[Python 多线程] Lock、阻塞锁、非阻塞锁 (八)

python aide_941 24℃

[Python 多线程] Lock、阻塞锁、非阻塞锁 (八)

 

线程同步技术:

解决多个线程争抢同一个资源的情况,线程协作工作。一份数据同一时刻只能有一个线程处理。

 

解决线程同步的几种方法:

Lock、RLock、Condition、Barrier、semaphore

 

1)Lock 锁

锁,一旦线程获得锁,其它试图获取锁的线程将被阻塞。

当用阻塞参数设置为 False 时, 不要阻止。如果将阻塞设置为 True 的调用将阻止, 则立即返回 False;否则, 将锁定设置为锁定并返回 True。

 

Lock的方法:
acquire(blocking=True,timeout=-1)  加锁。默认True阻塞,阻塞可以设置超时时间。非阻塞时成功获取锁返回True,否则返回False。

当blocking设置为False时,不阻塞,同一个锁对象,其它线程可以重用,但最后都必须释放。

如果设置为True(默认True),其它试图调用锁的线程将阻塞,并立即返回False。阻塞可以设置超时时间。

 

release() 释放锁。可以从任何线程调用释放。已上锁的锁,会被重置为unlocked,对未上锁的锁调用,会抛RuntimeError异常: cannot release un-acquired lock。

 

 

不使用Lock的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#不使用Lock锁的例子
import logging
import threading,time
logging.basicConfig(level=logging.INFO)
# 10 -> 100cups
cups = []
lock = threading.Lock()
def worker(lock:threading.Lock,task=100):
    while True:
        count = len(cups)
        time.sleep(0.1)
        if count >= task:
            break
        logging.info(count)
        cups.append(1)
        logging.info("{} make 1........ ".format(threading.current_thread().name))
    logging.info("{} ending=======".format(len(cups)))
for in range(10):
    threading.Thread(target=worker,args=(lock,100)).start()
运行结果:
INFO:root:Thread-7 make 1........
INFO:root:93
INFO:root:Thread-5 make 1........
INFO:root:95
INFO:root:Thread-6 make 1........
INFO:root:92
INFO:root:Thread-2 make 1........
INFO:root:94
INFO:root:Thread-8 make 1........
INFO:root:97
INFO:root:Thread-10 make 1........
INFO:root:96
INFO:root:Thread-4 make 1........
INFO:root:98
INFO:root:Thread-1 make 1........
INFO:root:99
INFO:root:Thread-9 make 1........
INFO:root:109 ending=======
INFO:root:109 ending=======
INFO:root:109 ending=======

还是使用前面的10个工人生产100杯子的例子, 当做到99个杯子时,10个工人都发现还少一个,都去做了一个,一共做了109个,超出了100个,就发生了不可预期的结果。

 

临界线判断失误,多生产了杯子。

 

解决方法就可以用锁,来解决资源争抢。当一个人看杯子数量时,就上锁,其它人只能等着,看完杯子后发现少一个就把这最后一个做出来,然后数量加一,解锁,其他人再看到已经有100个杯子时,就可以停止工作。

 

加锁的时机非常重要:看杯子数量时加锁,增加数量后释放锁。

 

使用Lock的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#Lock
import logging
import threading
import time
logging.basicConfig(level=logging.INFO)
# 10 -> 100cups
cups = []
lock = threading.Lock()
def worker(lock:threading.Lock,task=100):
    while True:
        if lock.acquire(False):
            count = len(cups)
            time.sleep(0.1)
            
            if count >= task:
                lock.release()
                break
            logging.info(count)
            cups.append(1)
            lock.release()
            logging.info("{} make 1........ ".format(threading.current_thread().name))
    logging.info("{} ending=======".format(len(cups)))
for in range(10):
    threading.Thread(target=worker,args=(lock,100)).start()
运行结果:
INFO:root:0
INFO:root:Thread-1 make 1........
INFO:root:1
INFO:root:Thread-5 make 1........
INFO:root:2
INFO:root:Thread-6 make 1........
....
INFO:root:Thread-3 make 1........
INFO:root:97
INFO:root:Thread-3 make 1........
INFO:root:98
INFO:root:Thread-4 make 1........
INFO:root:99
INFO:root:Thread-3 make 1........
INFO:root:100 ending=======
INFO:root:100 ending=======
INFO:root:100 ending=======
.....

在使用了锁以后,虽然保证了结果的准确性,但是性能下降了很多。

 

一般来说加锁以后还要有一些功能实现,在释放之前还有可能抛异常,一旦抛出异常,锁是无法释放,但是当前线程可能因为这个异常被终止了,这就产生了死锁。

 

死锁解决办法:

1、使用 try..except..finally 语句处理异常、保证锁的释放

2、with 语句上下文管理,锁对象支持上下文管理。只要实现了__enter__和__exit__魔术方法的对象都支持上下文管理。

 

锁的应用场景:

独占锁: 锁适用于访问和修改同一个共享资源的时候,即读写同一个资源的时候。

共享锁: 如果共享资源是不可变的值时,所有线程每一次读取它都是同一样的值,这样的情况就不需要锁。

 

使用锁的注意事项:

  • 少用锁,必要时用锁。使用了锁,多线程访问被锁的资源时,就变成了串行,要么排队执行,要么争抢执行。
  • 加锁时间越短越好,不需要就立即释放锁。
  • 一定要避免死锁。

不使用锁时,有了效率,但是结果是错的。

使用了锁,变成了串行,效率地下,但是结果是对的。

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import threading
import time
lock = threading.Lock()
def work():
    print('working..')
    time.sleep(0.2)
    lock.release() # 1解锁
lock.acquire() # 1上锁
print("get locker 1")
threading.Thread(target=work).start()
time.sleep(1)
lock.acquire() # 2上锁
print("get locker 2")
threading.Thread(target=work).start()
print("release locker")
运行结果:
get locker 1
working..
get locker 2
working..
release locker

同一个锁对象在释放后可以再次使用。

但是如果同一把锁加锁后,又被别人拿了,自己就阻塞了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import threading
import time
lock = threading.Lock()
def work():
    print('working..')
    time.sleep(0.2)
    lock.release() # 1解锁
lock.acquire() # 1上锁
print("get locker 1")
lock.acquire() # 2上锁
print("get locker 2")
threading.Thread(target=work).start()
threading.Thread(target=work).start()
print("release locker")
运行结果:
get locker 1
阻塞状态....

 

阻塞锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#阻塞锁
import threading,time
lock = threading.Lock()
def foo():
    ret = lock.acquire()
    print("{} Locked. {}".format(ret,threading.current_thread()))
    time.sleep(10)
threading.Thread(target=foo).start()
threading.Thread(target=foo).start()
运行结果:
True Locked. <Thread(Thread-1, started 123145559191552)>

lock.acquire()默认设置blocking=True,两个线程使用同一个Lock锁对象,只要Thread-1线程不释放,第二个线程就无法获取锁,且会使Thread-1线程阻塞。

如果想让多个线程同时都可以使用一个锁对象,就必须使用非阻塞锁,或者第一个线程使用完锁之后立刻释放,然后第二个线程再使用。

 

非阻塞锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#非阻塞锁
import threading,time
lock = threading.Lock()
def foo():
    ret = lock.acquire(False)
    print("{} Locked. {}".format(ret,threading.current_thread()))
    time.sleep(10)
threading.Thread(target=foo).start()
threading.Thread(target=foo).start()
运行结果:
True Locked. <Thread(Thread-1, started 123145516146688)>
False Locked. <Thread(Thread-2, started 123145521401856)>
Process finished with exit code 0

lock.acquire(False)设置blocking=False表示不阻塞,使用同一个Lock锁对象时,第二个线程仍可以使用锁,且第一个锁不会被阻塞。

 

非阻塞锁2:

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#非阻塞锁
import threading,logging,time
FORMAT = '%(asctime)s\t [%(threadName)s,%(thread)d] %(message)s'
logging.basicConfig(level=logging.INFO,format=FORMAT)
def worker(tasks):
    for task in tasks:
        time.sleep(0.01)
        if task.lock.acquire(False): #False非阻塞
            logging.info('{} {} begin to start'.format(threading.current_thread().name,task.name))
        else:
            logging.info('{} {} is working'.format(threading.current_thread().name,task.name))
class Task:
    def __init__(self,name):
        self.name = name
        self.lock = threading.Lock()
tasks = [Task('task={}'.format(t)) for in range(5)]
for in range(3):
    = threading.Thread(target=worker,name='worker-{}'.format(i),args=(tasks,))
    t.start()
运行结果:
2017-12-19 16:37:49,556  [worker-2,123145390018560] worker-2 task=0 begin to start
2017-12-19 16:37:49,556  [worker-1,123145384763392] worker-1 task=0 is working
2017-12-19 16:37:49,557  [worker-0,123145379508224] worker-0 task=0 is working
2017-12-19 16:37:49,567  [worker-2,123145390018560] worker-2 task=1 begin to start
2017-12-19 16:37:49,567  [worker-1,123145384763392] worker-1 task=1 is working
2017-12-19 16:37:49,568  [worker-0,123145379508224] worker-0 task=1 is working
2017-12-19 16:37:49,580  [worker-1,123145384763392] worker-1 task=2 begin to start
2017-12-19 16:37:49,580  [worker-2,123145390018560] worker-2 task=2 is working
2017-12-19 16:37:49,580  [worker-0,123145379508224] worker-0 task=2 is working
2017-12-19 16:37:49,591  [worker-1,123145384763392] worker-1 task=3 begin to start
2017-12-19 16:37:49,592  [worker-2,123145390018560] worker-2 task=3 is working
2017-12-19 16:37:49,592  [worker-0,123145379508224] worker-0 task=3 is working
2017-12-19 16:37:49,604  [worker-1,123145384763392] worker-1 task=4 begin to start
2017-12-19 16:37:49,604  [worker-2,123145390018560] worker-2 task=4 is working
2017-12-19 16:37:49,604  [worker-0,123145379508224] worker-0 task=4 is working

 

转载请注明:SuperIT » [Python 多线程] Lock、阻塞锁、非阻塞锁 (八)

喜欢 (0)or分享 (0)