这篇博客介绍 PyTorch 中自动微分引擎的实现,主要分为三部分:首先简要介绍一下计算图的原理;然后介绍 PyTorch 中与 autograd 的相关数据结构和 backward()
函数的实现,数据结构包括 torch::autograd::Variable
, torch::autograd::Function
等;最后讲一下动态建立计算图的实现,这部分代码涉及到动态派发机制,而且都是用脚本生成的,不太容易理解。
目录
计算图简介
计算图是一个有向图,它的每个节点都表示一个函数(如加减乘除等)或者输入数据(叶子节点)。计算图的边代表数据流向:指向某个节点的边为该节点的输入,由该节点流出的边表示它的输出。计算图可以用来描述神经网络的计算,如下图描述了 \(y = \sin(xa+b)\) 的计算过程:
计算图的求值分为前向传播和反向传播,分别用于计算输出和梯度。前向传播的过程就是从叶子节点开始遍历计算图,直到整个图都被遍历过;而反向传播就是从输出节点开始遍历,节点表示的函数也变为原函数对输入的导数。举个栗子,下面是addcmul()
函数的计算图的前向和反向计算过程:
Autograd Engine
介绍完了标准的计算图结构,那么PyTorch里面是怎么实现的呢?在回答这个问题之前首先要看几个相关的数据结构,分别是Variable
, AutogradMeta
, Function
和Edge
。
Variable
相信用过老版本的PyTorch的小伙伴对Variable
一定不会陌生,它是用来实现自动微分的核心数据结构,代码在torch/csrc/autograd/variable.h
。Variable
可以表示计算图中的叶子节点,如权重,也可以表示图中的中间变量,虽然在新版本中Tensor
与Variable
合并了,在前端中可以直接用Tensor
代替Variable
,但是Variable
并没有消失。
它的实现确实改变了,但功能依旧。Variable
继承自at::Tensor
,重载了Tensor
里与梯度计算相关的方法,同时也提供了和Tensor
隐式转换的构造函数。
Variable
的底层实现Variable::Impl
也继承at::TensorImpl
,这个类在此系列的第一篇 中介绍过,它里面有一个成员bool is_variable
,用于识别这个 Tensor 到底是at::Tensor
还是Variable
。TensorImpl
里还有一个重要的成员:
1 std ::unique_ptr <c10::AutogradMetaInterface> autograd_meta_=nullptr ;
autograd_meta_
记录了当前与Variable
相关的计算图信息,在Variable::Impl
里它被强制转换成AutogradMeta*
类型:
1 2 3 Variable::AutogradMeta* get_autograd_meta () const { return static_cast <Variable::AutogradMeta*>(autograd_meta()); }
其中AutogradMeta
类型实现了c10::AutogradMetaInterface
接口。
AutogradMeta
AutogradMeta
的声明也在variable.h
中,它的声明如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 struct TORCH_API Variable : :AutogradMeta : public c10::AutogradMetaInterface { std ::string name; Variable grad_; std ::shared_ptr <Function> grad_fn_; std ::weak_ptr<Function> grad_accumulator_; VariableVersion version_counter_; std ::vector <std ::shared_ptr <FunctionPreHook>> hooks_; bool requires_grad_; bool is_view_; uint32_t output_nr_; PyObject* pyobj_ = nullptr ; std ::mutex mutex_; void set_requires_grad (bool requires_grad, at::TensorImpl* self_impl) override { requires_grad_ = requires_grad; } bool requires_grad () const override { return requires_grad_ || grad_fn_; } Variable& grad () override { return grad_; } const Variable& grad () const override { return grad_; } };
从声明中可以看出AutogradMeta
不但存储了梯度,还存储了该Variable
对应的反向传播函数,也就是计算图的节点。
PyTorch只建立反向传播的计算图,因为其实前向传播是用户自己定义的,不用PyTorch干什么。但是在用户在定义前向连接的时候,PyTorch需要偷偷建立反向连接,具体怎么操作见下一节。PyTorch里计算图的节点全部都是Function
,由于只计算图只用于反向传播,所以 Function
实现都是反向传播函数。对于计算图中的中间结果,对应的grad_fn_
是相应的反向传播函数;而对于叶节点(权重),对应的grad_fn_
为nullptr
,而grad_accumulator_
为其相应的处理函数,就是把梯度累加存储到grad_
里。除此之外,不需要梯度的输入是不会进入计算图中的。
仔细观察我们会发现,Variable
里拥有AutogradMeta
,而后者里用有Function
,所以实际上Variable
和它的grad_fn_
是绑定的,也就是说,一个Variable
只能有一个grad_fn_
,但反过来则不一定,一个grad_fn_
也可能属于多个Variable
(如果正向传播函数输出很多的话,那么这些输出共享一个反向传播函数)。
Function & Edge
Function
和Edge
实现是紧密相连的,Function
本质是一个函数对象,可以当作反向传播的函数来用。除此之外,Function
里还有一个成 edge_list next_edges_;
,是与该节点相连的边。PyTorch的计算图中的边其实就是{function, input_nr}
pair:前者表示这条边指向哪个节点,后者表示本节点是目标节点的第几个输入(从0开始)。
Function
的实现在csrc/autograd/function.h
,大致声明如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 struct TORCH_API Function : std ::enable_shared_from_this<Function> { public : Function(const Function& other) = delete ; Function(Function&& other) = delete ; Function& operator =(const Function& other) = delete ; Function& operator =(Function&& other) = delete ; virtual ~Function() = default ; variable_list operator () (variable_list&& inputs) { profiler::RecordFunction rec (this ) ; return apply(std ::move(inputs)); } const Edge& next_edge (size_t index) const noexcept { return next_edges_[index]; } void set_next_edge (size_t index, Edge edge) { next_edges_[index] = std ::move(edge); } void add_next_edge (Edge edge) { next_edges_.push_back(std ::move(edge)); } void set_next_edges (edge_list&& next_edges) { next_edges_ = std ::move(next_edges); } const edge_list& next_edges () const noexcept { return next_edges_; } edge_list& next_edges () noexcept { return next_edges_; } uint32_t num_outputs() const noexcept { return next_edges_.size(); } protected : static uint64_t& get_next_sequence_nr () ; virtual variable_list apply (variable_list&& inputs) = 0 ; variable_list traced_apply (variable_list inputs) ; const uint64_t sequence_nr_; edge_list next_edges_; PyObject* pyobj_ = nullptr ; std ::unique_ptr <AnomalyMetadata> anomaly_metadata_ = nullptr ; std ::vector <std ::unique_ptr <FunctionPreHook>> pre_hooks_; std ::vector <std ::unique_ptr <FunctionPostHook>> post_hooks_; at::SmallVector<InputMetadata, 2 > input_metadata_; };
Edge
的声明在csrc/autograd/edge.h
,它的声明就很简单了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 struct Edge { Edge() noexcept : function(nullptr ), input_nr(0 ) {} Edge(std ::shared_ptr <Function> function_, uint32_t input_nr_) noexcept : function(std ::move(function_)), input_nr(input_nr_) {} bool is_valid () const noexcept { return function != nullptr ; } bool operator ==(const Edge& other) const noexcept { return this ->function == other.function && this ->input_nr == other.input_nr; } bool operator !=(const Edge& other) const noexcept { return !(*this == other); } std ::shared_ptr <Function> function; uint32_t input_nr; };
总结一下,Variable
,AutogradMeta
,Function
和Edge
的大致关系如下图所示:
Engine
我们先不看怎么建立计算图,而是先假设图已经建立好,考虑具体怎么执行反向传播。你可能觉得答案已经很明显了,不就是对图进行遍历么,没错,是对图进行遍历,但考虑到效率以及要在不同设备上计算,实际操作起来还是有点麻烦的,这部分代码主要由autograd::Engine
实现,声明和实现分别在csrc/autograd/engine.h
和csrc/autograd/engine.cpp
中。
用 PyTorch 建立神经网络的时候,相信你一定用过loss.backward()
来进行反向传播,那就从Variable::backward()
函数开始一步一步看反向传播是怎么执行的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 void Variable::backward( c10::optional<Tensor> gradient, bool keep_graph, bool create_graph) const { auto autograd_meta = get_autograd_meta(); std ::vector <Edge> edges; edges.emplace_back(autograd_meta->grad_fn_, autograd_meta->output_nr_); std ::vector <Variable> inputs; inputs.push_back(std ::move(as_variable_ref(*gradient))); Engine::get_default_engine().execute(edges, inputs, keep_graph, create_graph); }
函数首先构造遍历的起点和输入,然后调用Engine::execute()
进行计算,其中Engine::get_default_engine()
返回的是Engine
的实例。
接着研究Engine::execute()
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 auto Engine::execute(const edge_list& roots, const variable_list& inputs, bool keep_graph, bool create_graph, const edge_list& outputs) -> variable_list { std ::call_once(start_threads_flag, &Engine::start_threads, this ); GraphTask graph_task (keep_graph, create_graph) ; std ::unique_lock<std ::mutex> lock(graph_task.mutex); auto graph_root = std ::make_shared<GraphRoot>(roots, inputs); compute_dependencies(graph_root.get(), graph_task); if (!outputs.empty()) { graph_task.init_to_execute(*graph_root, outputs); } ready_queue(-1 ).push(FunctionTask(&graph_task, std ::move(graph_root), InputBuffer(0 ))); if (worker_device == NO_DEVICE) { graph_task.not_done.wait(lock, [&graph_task]{ return graph_task.outstanding_tasks.load() == 0 ; }); } else { graph_task.owner = worker_device; lock.unlock(); thread_main(&graph_task); } return graph_task.captured_vars; }
Engine::execute()
里做了这几件事(可以把它想象成一个公司):
招聘员工(启动多线程)
明确整体任务(计算依赖)
准备第一份工作(把root
加入准备队列)
开始工作!
首先看启动多线程的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 auto Engine::start_threads() -> void { int num_devices = at::getNumGPUs(); int num_threads = num_devices + 1 ; ready_queues = std ::vector <std ::shared_ptr <ReadyQueue>>(num_threads); for (auto & queue : ready_queues) queue .reset(new ReadyQueue()); for (int i = 0 ; i < num_threads; ++i) { std ::thread t (&Engine::thread_init, this , i - 1 ) ; t.detach(); } }
接下来看如何计算依赖:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 auto Engine::compute_dependencies(Function* root, GraphTask& task) -> void { std ::unordered_set <Function*> seen; std ::vector <Function*> queue { root }; auto & dependencies = task.dependencies; while (!queue .empty()) { auto fn = queue .back(); queue .pop_back(); for (const auto & edge : fn->next_edges()) { if (auto next_ptr = edge.function.get()) { dependencies[next_ptr] += 1 ; const bool was_inserted = seen.insert(next_ptr).second; if (was_inserted) queue .push_back(next_ptr); } } } }
最后来看最主要的thread_main()
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 auto Engine::thread_main(GraphTask *graph_task) -> void { auto queue = ready_queues[worker_device + 1 ]; while (!graph_task || graph_task->outstanding_tasks > 0 ) { FunctionTask task = queue ->pop(); if (task.fn && !task.base->has_error.load()) { GradMode::set_enabled(task.base->grad_mode); try { evaluate_function(task); } catch (std ::exception& e) { thread_on_exception(task, e); } } auto base_owner = task.base->owner; if (base_owner == NO_DEVICE) { if (--task.base->outstanding_tasks == 0 ) { std ::lock_guard<std ::mutex> lock(task.base->mutex); task.base->not_done.notify_all(); } } else { if (base_owner == worker_device) { --task.base->outstanding_tasks; } else if (base_owner != worker_device) { if (--task.base->outstanding_tasks == 0 ) { std ::atomic_thread_fence(std ::memory_order_release); ready_queue(base_owner).push(FunctionTask(task.base, nullptr , InputBuffer(0 ))); } } } } }
thread_main()
里没有具体执行任务的代码,而是把它单独抽出变成一个方法,下面看该方法都干了什么:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 auto Engine::evaluate_function(FunctionTask& task) -> void { auto outputs = call_function(task); auto & fn = *task.fn; if (!task.base->keep_graph) { fn.release_variables(); } int num_outputs = outputs.size(); if (num_outputs == 0 ) return ; std ::lock_guard<std ::mutex> lock(task.base->mutex); for (int i = 0 ; i < num_outputs; ++i) { auto & output = outputs[i]; const auto & next = fn.next_edge(i); if (!next.is_valid()) continue ; bool is_ready = false ; auto & dependencies = task.base->dependencies; auto it = dependencies.find(next.function.get()); if (it == dependencies.end()) { } else if (--it->second == 0 ) { dependencies.erase(it); is_ready = true ; } auto & not_ready = task.base->not_ready; auto not_ready_it = not_ready.find(next.function.get()); if (not_ready_it == not_ready.end()) { InputBuffer input_buffer (next.function->num_inputs()) ; input_buffer.add(next.input_nr, std ::move(output)); if (is_ready) { auto & queue = ready_queue(input_buffer.device()); queue .push(FunctionTask(task.base, next.function, std ::move(input_buffer))); } else { not_ready.emplace(next.function.get(), std ::move(input_buffer)); } } else { auto &input_buffer = not_ready_it->second; input_buffer.add(next.input_nr, std ::move(output)); if (is_ready) { auto & queue = ready_queue(input_buffer.device()); queue .push(FunctionTask(task.base, next.function, std ::move(input_buffer))); not_ready.erase(not_ready_it); } } } }
好,至此执行计算图的代码就梳理完了,简单总结一下:
把调用backward()
的节点设为根节点,从该节点开始遍历
首先BFS一遍计算图,计算每个节点的依赖
为每个设备建立一个工作线程,每个线程里有一个准备队列
每个线程等待任务到来直到任务全部完成
完成一个任务后遍历与之相邻的节点,更新他们的依赖(-1),若依赖为0则加入准备队列
动态建立计算图
这一小节介绍 PyTorch 是怎么建立用于反向传播的计算图的。根据前面的分析,计算图一定是在定义前向传播是建立的,那么机密一定是藏在前向传播API里了,这个猜想是没错,但是由于 ATen 复杂的动态派发机制以及使用脚本生成代码,我也是费了九牛二虎之力才找到具体实现以及理解其中的逻辑。
神经网络的具体计算:每一层的前向和反向传播,其实都是在 ATen 里实现的,但回去看 ATen 的API实现,并没有发现其中有任何建立计算图的代码。这其中的机密实际在tools/autograd
目录里。这个目录里有derivatives.yaml
和用于生成代码的脚本,前者记录了所有需要自动微分的ATen API,后者为它们生成一层wrapper代码,这些代码主要干两件事:
把ATen的反向传播API转换成Function
在ATen的正向传播API中加入建图过程
举个例子,derivatives.yaml
第119行的addcmul()
API:
1 2 3 4 - name: addcmul(Tensor self, Tensor tensor1, Tensor tensor2, *, Scalar value) self: grad tensor1: grad * tensor2 * value tensor2: grad * tensor1 * value
这里指明了函数名、参数、反向计算方法。执行gen_autograd.py
可以自动为其生成代码,生成的代码在torch/csrc/autograd/generated
中,有 addcmul()
的前向传播的代码在VariableType0.cpp
中, 反向传播的代码在Functions.h
和Functionss.cpp
中。
前向传播
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 Tensor VariableType::addcmul(const Tensor & self, const Tensor & tensor1, const Tensor & tensor2, Scalar value) const { profiler::RecordFunction profiler ("addcmul" , Function::peek_at_next_sequence_nr()) ; auto & self_ = unpack(self, "self" , 0 ); auto & tensor1_ = unpack(tensor1, "tensor1" , 1 ); auto & tensor2_ = unpack(tensor2, "tensor2" , 2 ); std ::shared_ptr <AddcmulBackward> grad_fn; if (compute_requires_grad( self, tensor1, tensor2 )) { grad_fn = std ::shared_ptr <AddcmulBackward>(new AddcmulBackward(), deleteFunction); grad_fn->set_next_edges(collect_next_edges(self, tensor1, tensor2)); if (grad_fn->should_compute_output(1 )) { grad_fn->tensor2_ = SavedVariable(tensor2, false ); } grad_fn->value = value; if (grad_fn->should_compute_output(2 )) { grad_fn->tensor1_ = SavedVariable(tensor1, false ); } } auto tmp = ([&]() { at::AutoNonVariableTypeMode non_var_type_mode(true ); return baseType->addcmul(self_, tensor1_, tensor2_, value); })(); auto result = as_variable(tmp); set_history(flatten_tensor_args(result), grad_fn); return result; }
这个函数是前向计算的API,在具体计算之前先建立反向传播函数(节点),并且把该节点与输入 的节点相连;然后调用下层API计算结果;最后把结果和新建立的节点绑定,这样用于反向传播的计算图就建立完成了。由于这是反向传播计算图,所以前向传播中的输入节点变成反向的输出,前向的输出节点变成反向的输入,如下图所示。
反向传播
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 struct AddcmulBackward : public TraceableFunction { using TraceableFunction::TraceableFunction; variable_list apply (variable_list&& grads) override ; std ::string name () const override { return "AddcmulBackward" ; } void release_variables () override { tensor2_.reset_data(); tensor2_.reset_grad_function(); tensor1_.reset_data(); tensor1_.reset_grad_function(); } SavedVariable tensor2_; Scalar value; SavedVariable tensor1_; }; variable_list AddcmulBackward::apply(variable_list&& grads) { IndexRangeGenerator gen; auto self_ix = gen.range(1 ); auto tensor1_ix = gen.range(1 ); auto tensor2_ix = gen.range(1 ); variable_list grad_inputs (gen.size()) ; auto & grad = grads[0 ]; auto tensor2 = tensor2_.unpack(); auto tensor1 = tensor1_.unpack(); if (should_compute_output({ self_ix })) { auto grad_result = grad; copy_range(grad_inputs, self_ix, grad_result); } if (should_compute_output({ tensor1_ix })) { auto grad_result = grad * tensor2 * value; copy_range(grad_inputs, tensor1_ix, grad_result); } if (should_compute_output({ tensor2_ix })) { auto grad_result = grad * tensor1 * value; copy_range(grad_inputs, tensor2_ix, grad_result); } return grad_inputs; }
可以看到这些代码确确实实把 ATen API 转换成了计算图节点Function
。
动态派发
最后还有一个问题就是代码中调用的API是torch.addcmul()
或variable.addcmul()
怎么就会执行到上面的VariableType::addcmul()
呢?这就要归功于 ATen 的动态派发了,以第二种调用举例分析一下,也就是通过一个Variable
类型变量调用addcmul()
。
首先,上一篇说过,ATen 的API都记录在ATen/native/native_functions.yaml
里,这些API如果有生成 method 的需求的话,会把声明通过脚本自动添加 at::Tensor
类里,这样就可以通过tensor.addcmul()
调用该API,而Variable
继承自at::Tensor
,自然也就拥有了该方法。
但这还没完,如果查看这个方法的实现会发现:
1 2 3 inline Tensor Tensor::addcmul(const Tensor & tensor1, const Tensor & tensor2, Scalar value) const { return type().addcmul(*this , tensor1, tensor2, value); }
它调用了type()
的相应方法,这个type()
返回的是Type
类型的实例。Type
类型同样声明了所有 ATen API,并且有TensorType
VariableType
继承自它。根据ATen的动态派发机制,如果调用者是Tensor
的话,并且is_variable == False
,就会返回TensorType
实例;如果调用者是Variable
的话,或者is_variable == True
,就会返回上述的VariableType
实例。所以一个Variable
类型的变量调用addcmul()
的话实际会执行VariableType::addcmul()
。
つづく