这项来自斯坦福大学、加州大学伯克利分校、加州大学圣迭戈分校和 Meta 的研究提出了一个新颖的序列建模方法,称为测试时训练(Test-Time Training, TTT)层。TTT 层通过用机器学习模型取代 RNN 的隐藏状态,并使用输入 token 的实际梯度下降来压缩上下文。研究表明,这种方法在性能上可以优于现有的 Transformer 和 Mamba 架构。
TTT 层的设计亮点包括:
1. 替代自注意力机制:TTT 层直接取代了 Transformer 中的自注意力层,采用了线性模型(TTT-Linear)和两层 MLP(TTT-MLP)作为隐藏状态。
2. 线性复杂度架构:通过表达性记忆解锁线性复杂性架构,能够在上下文中训练具有数百万甚至数十亿个 token 的大语言模型(LLM)。
3. 更高效的上下文利用:TTT 层通过更高效的上下文压缩和模型记忆机制,相较于 RNN 层能更好地处理长上下文。
4. 更快的实际运行时间:尽管自注意力机制的复杂度是线性的,研究表明 TTT 层在实际运行时间上更快。
实验结果表明,TTT-Linear 和 TTT-MLP 在较大的上下文长度(如 8k token)下明显优于 Mamba 和 Transformer。具体观察到的优点包括:
- TTT 层在长上下文中的表现优于 Mamba,更具表现力并且更高效。
- 在相同 FLOPs 成本下,TTT-MLP 的困惑度(perplexity)比 TTT-Linear 更低,但由于 FLOPs 的额外成本,TTT-Linear 在效率方面有优势。
研究团队还在 JAX 和 PyTorch 上提供了代码,用于模型训练和测试,可以在以下链接中获取:
- JAX 代码:[https://github.com/test-time-training/ttt-lm-jax](https://github.com/test-time-training/ttt-lm-jax)
- PyTorch 推理代码:[https://github.com/test-time-training/ttt-lm-pytorch](https://github.com/test-time-training/ttt-lm-pytorch)
接下来我将对模型前半部分代码进行解析
定义配置类 TTTConfig
定义基础模块如 rotate_half
, permute_qk
等
定义 RMSNorm
, SwiGluMLP
, RotaryEmbedding
等组件类
定义 Conv
卷积层
定义 TTTCache
缓存机制
TTTConfig
类
定义了一个用于配置 TTT 模型的类 `TTTConfig`,还包含了一些辅助函数,用于处理查询和键张量的旋转位置嵌入。这些函数和类主要用于模型的配置和预处理步骤,就不详细给出实现了
辅助函数
rotate_half
函数
这个函数将输入张量 x
一分为二,然后交换这两部分的位置,并返回结果。
permute_qk
函数
这个函数对查询(q
)和键(k
)张量进行重新排序,以匹配 JAX 实现中的维度顺序。
undo_permute_qk
函数
def undo_permute_qk(q, k): bsz, num_head, seq_len, head_dim = q.shape q = q.reshape(bsz, num_head, seq_len, 2, head_dim // 2).transpose(3, 4).reshape(bsz, num_head, seq_len, head_dim) k = k.reshape(bsz, num_head, seq_len, 2, head_dim // 2).transpose(3, 4).reshape(bsz, num_head, seq_len, head_dim) return q, k
这个函数将 permute_qk
函数的操作逆转,恢复原始维度顺序。
apply_rotary_pos_emb
函数
def apply_rotary_pos_emb(q, k, cos, sin, position_ids=None, unsqueeze_dim=1): cos = cos.unsqueeze(unsqueeze_dim) sin = sin.unsqueeze(unsqueeze_dim) q_embed = (q * cos) + (rotate_half(q) * sin) k_embed = (k * cos) + (rotate_half(k) * sin) return q_embed, k_embed
这个函数将旋转位置嵌入应用于查询(q
)和键(k
)张量。通过对 cos
和 sin
张量进行适当的扩展,以便它们可以正确地广播到 q
和 k
的维度上。
RMSNorm类:
主要用于对输入张量进行Root Mean Square Layer Normalization。初始化方法定义了权重和防止除零的小常数。前向传播方法对输入张量进行归一化处理,并乘以权重SwiGluMLP类:
这是一个使用SiLU激活函数和Gated Linear Units (GLU)的多层感知机。初始化方法定义了相关的线性层和激活函数。前向传播方法根据配置中的pretraining_tp
值,对输入张量进行切片和线性变换操作。 import torchfrom torch import nnimport torch.nn.functional as F# 激活函数字典,假设在其他地方定义ACT2FN = { "silu": torch.nn.SiLU(), # 其他激活函数可以在这里添加}class RMSNorm(nn.Module): def __init__(self, hidden_size, eps=1e-6): """初始化RMSNorm层。 Args: hidden_size (int): 隐藏层维数。 eps (float): 防止除零的小常数。 """ super().__init__() self.weight = nn.Parameter(torch.ones(hidden_size)) # 初始化权重参数为全1 self.variance_epsilon = eps # 设置epsilon值 def forward(self, hidden_states): """前向传播函数。 Args: hidden_states (torch.Tensor): 输入隐藏状态张量。 Returns: torch.Tensor: 正则化后的隐藏状态张量。 """ input_dtype = hidden_states.dtype # 保存输入张量的dtype hidden_states = hidden_states.to(torch.float32) # 将输入转换为float32 variance = hidden_states.pow(2).mean(-1, keepdim=True) # 计算方差 hidden_states = hidden_states * torch.rsqrt(variance + self.variance_epsilon) # 进行方差归一化 return self.weight * hidden_states.to(input_dtype) # 恢复原始dtype并乘以权重class SwiGluMLP(nn.Module): def __init__(self, config): """初始化SwiGluMLP层。 Args: config (TTTConfig): 模型配置对象。 """ super().__init__() self.config = config self.hidden_size = config.hidden_size # 从配置中获取隐藏层大小 self.intermediate_size = config.intermediate_size # 从配置中获取中间层大小 self.gate_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=False) # 定义线性层 self.up_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=False) # 定义线性层 self.down_proj = nn.Linear(self.intermediate_size, self.hidden_size, bias=False) # 定义线性层 self.act_fn = ACT2FN[config.hidden_act] # 从激活函数字典中获取激活函数 def forward(self, x): """前向传播函数。 Args: x (torch.Tensor): 输入张量。 Returns: torch.Tensor: 输出张量。 """ if self.config.pretraining_tp > 1: # 如果预训练tp值大于1,进行切分操作 slice = self.intermediate_size // self.config.pretraining_tp # 计算切分大小 gate_proj_slices = self.gate_proj.weight.split(slice, dim=0) # 切分gate_proj权重 up_proj_slices = self.up_proj.weight.split(slice, dim=0) # 切分up_proj权重 down_proj_slices = self.down_proj.weight.split(slice, dim=1) # 切分down_proj权重 # 分别对各个切片进行线性变换,然后拼接 gate_proj = torch.cat( [F.linear(x, gate_proj_slices[i]) for i in range(self.config.pretraining_tp)], dim=-1, ) up_proj = torch.cat( [F.linear(x, up_proj_slices[i]) for i in range(self.config.pretraining_tp)], dim=-1, ) # 计算中间状态,并将其切分 intermediate_states = (self.act_fn(gate_proj) * up_proj).split(slice, dim=2) # 对每个切片进行线性变换,然后相加 down_proj = [ F.linear(intermediate_states[i], down_proj_slices[i]) for i in range(self.config.pretraining_tp) ] down_proj = sum(down_proj) # 将所有切片相加 else: # 如果预训练tp值不大于1,直接进行线性变换和激活函数计算 down_proj = self.down_proj(self.act_fn(self.gate_proj(x)) * self.up_proj(x)) return down_proj # 返回输出张量
class RotaryEmbedding
import torchfrom torch import nnclass RotaryEmbedding(nn.Module): def __init__( self, dim, max_position_embeddings=16, base=10000, device=None, scaling_factor=1.0, ): """初始化RotaryEmbedding层。 Args: dim (int): 位置嵌入的维度。 max_position_embeddings (int, optional): 最大位置嵌入数。默认值为16。 base (int, optional): 基数。默认值为10000。 device (torch.device, optional): 设备。默认值为None。 scaling_factor (float, optional): 缩放因子。默认值为1.0。 """ super().__init__() self.scaling_factor = scaling_factor # 设置缩放因子 self.dim = dim # 位置嵌入的维度 self.max_position_embeddings = max_position_embeddings # 最大位置嵌入数 self.base = base # 基数 # 计算逆频率 inv_freq = 1.0 / (self.base ** (torch.arange(0, self.dim, 2, dtype=torch.int64).float().to(device) / self.dim)) # 注册为buffer,以便在模型保存和加载时保持不变 self.register_buffer("inv_freq", inv_freq, persistent=False) @torch.no_grad() def forward(self, x, position_ids): """前向传播函数。 Args: x (torch.Tensor): 输入张量,形状为[bs, num_attention_heads, seq_len, head_size]。 position_ids (torch.Tensor): 位置ID张量。 Returns: tuple: 返回余弦和正弦位置嵌入张量。 """ # 扩展逆频率张量的维度,以适应输入张量的形状 inv_freq_expanded = self.inv_freq[None, :, None].float().expand(position_ids.shape[0], -1, 1) # 扩展位置ID张量的维度 position_ids_expanded = position_ids[:, None, :].float() # 强制使用float32,以避免bfloat16在长上下文中失去精度 # 参见https://github.com/huggingface/transformers/pull/29285 device_type = x.device.type device_type = device_type if isinstance(device_type, str) and device_type != "mps" else "cpu" # 使用不自动混合精度的上下文进行计算 with torch.autocast(device_type=device_type, enabled=False): # 计算频率嵌入 freqs = (inv_freq_expanded.float() @ position_ids_expanded.float()).transpose(1, 2) # 将频率嵌入沿最后一个维度拼接 emb = torch.cat((freqs, freqs), dim=-1) # 计算余弦和正弦位置嵌入 cos = emb.cos() sin = emb.sin() # 返回余弦和正弦位置嵌入张量,并将它们的dtype设置为输入张量的dtype return cos.to(dtype=x.dtype), sin.to(dtype=x.dtype)
class Conv
import torchfrom torch import nnimport torch.nn.functional as F# 假设我们已经定义了RMSNorm类class Conv(nn.Module): def __init__(self, config, layer_idx): """初始化Conv层。 Args: config (TTTConfig): 模型配置对象。 layer_idx (int): 当前层的索引。 """ super().__init__() self.config = config self.layer_idx = layer_idx # 定义RMSNorm层 self.norm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps) # 定义一维卷积层 self.conv = nn.Conv1d( config.hidden_size, # 输入通道数 config.hidden_size, # 输出通道数 bias=True, # 是否使用偏置 kernel_size=config.conv_kernel, # 卷积核大小 groups=config.hidden_size, # 组数(组卷积) padding=config.conv_kernel - 1, # 填充大小 ) def __call__(self, hidden_states, cache_params=None): """前向传播函数。 Args: hidden_states (torch.Tensor): 输入隐藏状态张量。 cache_params (dict, optional): 缓存参数。默认值为None。 Returns: torch.Tensor: 输出隐藏状态张量。 """ seq_len = hidden_states.shape[1] hidden_states = self.norm(hidden_states) # 归一化 hidden_states = hidden_states.transpose(1, 2) # 转换形状以适应Conv1d # 如果没有定义causal_conv1d_fn if causal_conv1d_fn is None: if cache_params is not None: if cache_params.seqlen_offset > 0: # 处理缓存状态 conv_state = cache_params.conv_states_dic["pre_conv"][self.layer_idx] conv_state = torch.roll(conv_state, shifts=-1, dims=-1) conv_state[:, :, -1] = hidden_states[:, :, 0] cache_params.conv_states_dic["pre_conv"][self.layer_idx].copy_(conv_state) hidden_states = torch.sum(conv_state * self.conv.weight[:, 0, :], dim=-1) hidden_states += self.conv.bias hidden_states = hidden_states.unsqueeze(-1) else: conv_state = nn.functional.pad( hidden_states, (self.config.conv_kernel - hidden_states.shape[-1], 0), ) cache_params.conv_states_dic["pre_conv"][self.layer_idx].copy_(conv_state) hidden_states = self.conv(hidden_states)[..., :seq_len] else: hidden_states = self.conv(hidden_states)[..., :seq_len] else: conv_weights = self.conv.weight.view(self.conv.weight.size(0), self.conv.weight.size(2)) if cache_params is not None and cache_params.seqlen_offset > 0: hidden_states = causal_conv1d_update( hidden_states.squeeze(-1), cache_params.conv_states_dic["pre_conv"][self.layer_idx], conv_weights, self.conv.bias, None, ) hidden_states = hidden_states.unsqueeze(-1) else: if cache_params is not None: conv_states = nn.functional.pad( hidden_states, (self.config.conv_kernel - hidden_states.shape[-1], 0), ) cache_params.conv_states_dic["pre_conv"][self.layer_idx].copy_(conv_states) hidden_states = causal_conv1d_fn(hidden_states, conv_weights, self.conv.bias, activation=None) hidden_states = hidden_states.transpose(1, 2) # 转换形状以适应后续层 return hidden_states############################ TTT Layer Modules ############################def scan(f, init, xs, out, checkpoint_group=0): """模拟jax.lax.scan函数。 Args: f (function): 扫描函数。 init (any): 初始化值。 xs (list or dict): 输入数据。 out (list): 输出数据。 checkpoint_group (int, optional): 检查点组的大小。默认值为0。 Returns: any: 最终的carry值。 list: 输出数据。 """ carry = init if isinstance(xs, dict): num_items = len(next(iter(xs.values()))) else: num_items = len(xs[0]) def scan_fn(carry, i_start, i_end): for i in range(i_start, i_end): if isinstance(xs, dict): x = {key: tensor[i] for key, tensor in xs.items()} else: x = [x[i] for x in xs] carry, y = f(carry, x) out[i] = y return carry if checkpoint_group > 0: ckpt_every_n = num_items // checkpoint_group for k in range(0, num_items, ckpt_every_n): carry = torch.utils.checkpoint.checkpoint( scan_fn, carry, k, min(k + ckpt_every_n, num_items), use_reentrant=False ) else: carry = scan_fn(carry, 0, num_items) return carry, outdef ln_fwd(x, gamma, beta, eps=1e-6): """层归一化的前向传播。 Args: x (torch.Tensor): 输入张量。 gamma (torch.Tensor): 缩放参数。 beta (torch.Tensor): 平移参数。 eps (float, optional): 防止除零的小常数。默认值为1e-6。 Returns: torch.Tensor: 归一化后的张量。 """ mu = x.mean(dim=-1, keepdim=True) # 计算均值 var = x.var(dim=-1, keepdim=True, unbiased=False) # 计算方差 std = torch.sqrt(var + eps) # 计算标准差 x_hat = (x - mu) / std # 标准化 y = gamma * x_hat + beta # 缩放和平移 return ydef ln_fused_l2_bwd(x, l2_target, gamma, beta, eps=1e-6): """融合L2损失的层归一化的反向传播。 Args: x (torch.Tensor): 输入张量。 l2_target (torch.Tensor): L2目标张量。 gamma (torch.Tensor): 缩放参数。 beta (torch.Tensor): 平移参数。 eps (float, optional): 防止除零的小常数。默认值为1e-6。 Returns: torch.Tensor: 梯度张量。 """ D = x.shape[-1] mu = x.mean(dim=-1, keepdim=True) # 计算均值 var = x.var(dim=-1, keepdim=True, unbiased=False) # 计算方差 std = torch.sqrt(var + eps) # 计算标准差 x_hat = (x - mu) / std # 标准化 y = gamma * x_hat + beta # 缩放和平移 grad_output = y - l2_target # 计算输出的梯度 grad_x_hat = grad_output * gamma # 计算标准化后的梯度 z = ( (1.0 / D) * ( D * grad_x_hat - grad_x_hat.sum(dim=-1, keepdim=True) - x_hat * (grad_x_hat * x_hat).sum(dim=-1, keepdim=True) ) / std ) # 计算最终的梯度 return z# 修改自 https://github.com/NVIDIA/Megatron-LM/blob/e33c8f78a35765d5aa37475a144da60e8a2349d1/megatron/core/fusions/fused_bias_gelu.py#L26def gelu_bwd(x): """GELU激活函数的反向传播。 Args: x (torch.Tensor): 输入张量。 Returns: torch.Tensor: 梯度张量。 """ tanh_out = torch.tanh(0.79788456 * x * (1 + 0.044715 * x * x)) # 计算tanh输出 ff = 0.5 * x * ((1 - tanh_out * tanh_out) * (0.79788456 + 0.1070322243 * x * x)) + 0.5 * (1 + tanh_out) # 计算梯度 return ff
Conv
类:
scan
函数:
jax.lax.scan
函数的行为,通过逐步应用函数 f
处理输入数据,并支持检查点机制以节省内存。 ln_fwd
函数:
Class TTTCache
import torchfrom torch import nnfrom collections import defaultdictimport logginglogger = logging.getLogger(__name__)class TTTCache: """ TTTCache 是一个用于存储 TTT 层的最后隐藏状态和梯度的数据结构。 Arguments: model (TTTModel): 用于初始化缓存的模型。 batch_size (int): 批处理大小。 Attributes: seqlen_offset (int): 序列长度偏移量。 mini_batch_size (int): 微批处理大小。 params_dict (Dict[str, Dict[int, torch.Tensor]]): 以字典形式保存的参数,*_states, *_grad -> 层索引 -> [批处理大小, ...] conv_states_dic (Dict[str, Dict[int, torch.Tensor]]): 以字典形式保存的卷积状态,*_states -> 层索引 -> [批处理大小, ...] """ def __init__(self, model, batch_size: int): config = model.config self.seqlen_offset = 0 # 初始化序列长度偏移量 self.mini_batch_size = config.mini_batch_size # 获取微批处理大小 # 初始化TTT参数字典 self.ttt_params_dict = defaultdict(dict) if "linear" in config.ttt_layer_type: # 如果TTT层类型是线性,则使用W1和b1参数 self.ttt_param_names = ["W1", "b1"] elif "mlp" in config.ttt_layer_type: # 如果TTT层类型是MLP,则使用W1, b1, W2, b2参数 self.ttt_param_names = ["W1", "b1", "W2", "b2"] else: raise ValueError(f"TTT层类型 {config.ttt_layer_type} 目前不支持") # 初始化卷积状态字典 self.conv_states_dic = defaultdict(dict) logger.info(f"Creating cache of size: {batch_size}") for layer_idx in range(config.num_hidden_layers): for name in self.ttt_param_names: weight = getattr(model.layers[layer_idx].seq_modeling_block, name) # 将权重扩展至批处理大小 tiled_weight = torch.tile(weight.unsqueeze(0), (batch_size,) + (1,) * weight.dim()).to(model.device) self.ttt_params_dict[f"{name}_states"][layer_idx] = tiled_weight # 对于解码,需要存储梯度 self.ttt_params_dict[f"{name}_grad"][layer_idx] = torch.zeros_like(tiled_weight) if config.pre_conv: self.conv_states_dic["pre_conv"][layer_idx] = torch.zeros( batch_size, config.hidden_size, config.conv_kernel, device=model.device, ) if config.share_qk: self.conv_states_dic["ttt_conv_q"][layer_idx] = torch.zeros( batch_size, config.hidden_size, config.conv_kernel, device=model.device, ) self.conv_states_dic["ttt_conv_k"][layer_idx] = torch.zeros( batch_size, config.hidden_size, config.conv_kernel, device=model.device, ) def update(self, py_tree, layer_idx, seq_len): """更新缓存中的参数。 Args: py_tree (dict): 参数树。 layer_idx (int): 层索引。 seq_len (int): 序列长度。 """ if seq_len % self.mini_batch_size == 0: # 复制最后一个微批处理的状态,清空梯度 for name in self.ttt_param_names: self.ttt_params_dict[f"{name}_states"][layer_idx].copy_(py_tree[f"{name}_states"]) self.ttt_params_dict[f"{name}_grad"][layer_idx].zero_() elif seq_len < self.mini_batch_size: if seq_len != 1 and self.seqlen_offset > 0 and self.seqlen_offset % self.mini_batch_size != 0: raise ValueError("不支持分段更新") if (seq_len + self.seqlen_offset) % self.mini_batch_size == 0: # 复制最后一个微批处理的状态,清空梯度 for name in self.ttt_param_names: self.ttt_params_dict[f"{name}_states"][layer_idx].copy_(py_tree[f"{name}_states"]) self.ttt_params_dict[f"{name}_grad"][layer_idx].zero_() else: # 复制梯度以供下次更新使用 for name in self.ttt_param_names: self.ttt_params_dict[f"{name}_grad"][layer_idx].copy_(py_tree[f"{name}_grad"]) else: raise ValueError(f"序列长度 {seq_len} 是不支持的部分更新") def ttt_params_to_dict(self, layer_idx): """将特定层的TTT参数转化为字典格式。 Args: layer_idx (int): 层索引。 Returns: dict: 包含特定层TTT参数的字典。 """ return {name: self.ttt_params_dict[name][layer_idx] for name in self.ttt_params_dict}
__init__
方法:
update
方法:
ttt_params_to_dict
方法: