Skip to content

机器学习实践:ALS的矩阵分解-Part II

在之前「机器学习实践:ALS的矩阵分解」中,我们通过ALS矩阵分解,对用户兴趣标签进行向量化,得到了接近word2vec的效果。

但是遗留了一个重要的问题,矩阵分解模型是对user_iditem_id进行建模的,只有把对全量的id的数据都喂给模型,才能获得id向量。这个id向量是静态的,一次性的,但在推荐系统使用时,建模数据每天变化,user_iditem_id集合也是不断变化的,导致ALS直接使用并不方便,两方面的挑战:

  • 如果只关心item的向量,可以对user集合进行了10%的抽样,但如果需要user集合的向量,就需要对全量数据分解,训练时间就会大大增加。
  • 行为数据每天更新,每次训练ALS学出来的向量都会与旧数据完全不同,没有连续性。

这个挑战类似于GNN之于node2vec的改进,也就是 inductive(不使用node_id)和 transductive(使用node_id)的区别。

那么,这篇讨论的问题是,ALS算法能不能改造成inductive的? 也就是能否满足下面两点性质,

P1. 推导性

使用1/10的用户样本,学习item向量表达,然后在预估阶段,只传入user互动的item序列(不提供user_id),直接预估用户向量

P2. 连续性

保持物品向量是相对固定,随着用户兴趣序列的连续变化,用户向量的变化也是连续的

1、目标解析

回顾一下ALS算法的步骤,

  • Step1: 固定全部的物品向量itemFactors,传入每个用户对物品的打分user_ratings_list,计算出一个用户向量;
  • Step2: 固定全部用户向量userFactors,对打分记录group by item,算出每个item被互动过的记录item_ratings_list,重新计算这个物品的向量;
  • 当迭代一定次数后,这两个向量集合都会收敛。

因此,我们可以手动实现Step1的计算逻辑,加载训练好的itemFactors,在线上使用时,传入每一个用户的user_rating_list集合,就可以计算出全部的userFactors了。

2、代码实现

参考隐式反馈ALS的论文Collaborative Filtering for Implicit Feedback Datasets,以及Spark ALS的源代码,用numpy进行复现,并没有想象中的复杂,简单来说,

  • 将用户互动过的items的factors取出,加和为一个atb向量,维度是[rank, 1]
  • 将用户互动过的items的factors,计算外积,乘以alpha(>1),没互动过的items按权重1,计算外积,加和到一起,最后叠加一个正则项,形成一个ata矩阵,维度是[rank, rank]
  • 求解(ata)X = atb的解X向量,维度[rank, 1],就是这个用户的向量

这个解法推导过程是最小二乘法的正规方程/法方程(Normal Equation)的求解:

$$ (Y^tC_uY + \lambda I) x_u = Y^tC_up(u) $$ 其中

\[ p(u)[i] = p_{ui} = \begin{cases} 1 & r_{ui}>0 \\ 0 & r_{ui}=0 \end{cases} \]

\(Y\)是所有itemFactors经过stack得到的矩阵,\(C_u[i] = c_{ui} = 1 + \alpha r_{ui}\),调节正样本权重的,\(x_u\)为待定用户向量。

def solve_normal_equation(ata, atb, regParam, rank):
  """ 使用 Cholesky 分解求解正规方程 """
  try:
    ata += np.eye(rank) * regParam
    L = np.linalg.cholesky(ata)
    L_inv = np.linalg.inv(L)
    user_factors = np.dot(np.dot(L_inv.T, L_inv), atb)
  except np.linalg.LinAlgError:
    user_factors = np.zeros(rank)
  return user_factors

def compute_user_factors(user_ratings_list, item_factors_dict, YtY, rank=24, alpha=500, regParam=0.01):
  """ 计算用户的User向量 """
  ata = YtY.copy() # 加载预先计算itemFactors的外积Gramian矩阵
  atb = np.zeros(rank)

  for item_code, weight in user_ratings_list:
    item_factor = np.array(item_factors_dict.get(item_code, np.zeros(rank)))
    if weight > 0:
      c_ui = alpha * abs(weight)
      c_minus_1 = c_ui - 1.0
      ata += np.outer(item_factor, item_factor) * c_minus_1 # rank x rank
      atb += item_factor * c_ui # rank x 1

  return solve_normal_equation(ata, atb, regParam, rank) # rank x 1
预计算 \(Y^t * Y\)来加速

在单步更新函数 compute_user_factors 中,对于每个用户 u,需要计算 \(Y^t * C_u * Y\),这里的\(Y\)是全量的items的向量,包括用户点击的,以及未点击的,\(C_u\)控制正负例的样本权重,可以通过下面的公式改写,

\[ Y^t * C_u * Y = Y^t * Y + Y^t * (C_u - I) * Y \]

这里\(Y^t*Y\)对所有用户相同,可以预先计算出来,而 \((C_u - I)\) 矩阵只在用户有过正向行为 (weight > 0) 的 item 上有非零值,因此我们只需要遍历用户的正向行为列表,即可完成计算,避免了遍历所有 item。

def precompute_yt_y(item_factors_df):
  """ 
  预计算 Y^t * Y (item factor Gramian 矩阵)

  Args:
    item_factors_df: DataFrame, 预训练的物品因子 (item_code, factors)
    rank: int, ALS rank

  Returns:
    numpy.ndarray: Y^t * Y 矩阵 (rank x rank)
  """
  item_factors_array = np.stack(item_factors_df["features"])
  YtY = np.dot(item_factors_array.T, item_factors_array)
  return YtY

# %%
item_factors_df = model.itemFactors.toPandas().set_index("id").sort_index()
YtY = precompute_yt_y(item_factors_df)

YtY矩阵可视化,基本是rank=24维的对角阵(疑问:说明了啥?)

YtY矩阵可视化,基本是rank维度的对角阵

我们抽一个spark ALS训练的某个用户的向量,

>>> np.array(model.userFactors.where("id=14547221").toPandas().iloc[0]["features"])
array([ 0.78530389,  2.91513634,  4.5470233 ,  0.46865407, -4.87767458,
       -4.6883378 ,  1.06418002,  1.50253391,  1.69637954,  1.33667183,
        0.65817267,  0.22354223,  0.33607042, -1.89827347,  1.35138667,
       -2.12687564,  2.22609329, -4.16046667,  0.58304012,  0.17965284,
       -2.55924273, -0.49272653, -1.48579204,  0.86427361])

然后获取这个用户的互动物品列表,

>>> user_ratings_list = train_data.where("user_code==14547221").select("tag_code", "weight").rdd.map(lambda row: (row["tag_code"], row["weight"])).collect()
>>> len(user_ratings_list)
111
>>> user_ratings_list
[(6, 0.9010954778750403),
 (823, 0.8896817764827138),
 (503, 0.2585840575047922),
 (674, 0.309710058256),
 (686, 0.16724848608006257),
 (2297, 0.9996000599959997),
 (15, 0.8867539903123458),
 (136, 0.9996000599959997),
...
 (25126, 0.41185745066066415),
 (672, 0.12977288641920165),
 (84, 0.22120244505228956),
 (237, 0.16259040062500002),
 (445, 0.9988005398920082)]

使用上面的正规方程单步求解

>>> item_factors_dict = model.itemFactors.toPandas().set_index("id")["features"].to_dict()
>>> user_factor = compute_user_factors(user_ratings_list, item_factors_dict, YtY, rank=24, alpha=500, regParam=0.01)
>>> user_factor
array([ 0.86048751,  2.9873233 ,  4.59823802,  0.51473747, -5.0001467 ,
       -4.79051432,  1.1279331 ,  1.49323535,  1.65280927,  1.32899367,
        0.66127905,  0.20627895,  0.36428214, -1.86145192,  1.26869297,
       -2.19555517,  2.19837418, -4.19768464,  0.68737867,  0.21501057,
       -2.66944692, -0.53057997, -1.46352125,  0.84527976])

结论:代码一致

从结果上看,numpy的实现与Spark基本一致的,误差是因为单步更新也会存在区别(即iter=10更新到iter=11)。

3、平均池化

推荐系统中,处理用户的互动序列,一般做法是将互动过的物品的factors加和平均(也就是Average Pooling),作为用户向量。我就在想,正规方程的方法,会比这个baseline好吗?

如果看公式的话,会发现,正规方程的右侧项atb,其实就是Average Pooling的结果(乘以了固定的\(\alpha\)倍数),ALS求解出的向量,等价于对均值向量做了一次矩阵变换

矩阵变换ata

因为ata是实对称矩阵,所以也可说做了一系列初等变换,例如旋转,缩放。

代码实现一下平均池化,能看到虽然这个向量与ALS的解看起来差别比较大,但在各个维度上的正负基本一致

# average pooling by weight
>>> def compute_user_factors_avg(user_ratings_list, item_factors_dict, rank=24):
  """ 计算用户的User向量 """
  user_factor = np.zeros(rank)
  for item_code, weight in user_ratings_list:
    item_factor = np.array(item_factors_dict.get(item_code, np.zeros(rank)))
    user_factor += item_factor * weight
  return user_factor
# %%
>>> user_factor = compute_user_factors_avg(user_ratings_list, item_factors_dict, rank=24)
>>> user_factor
array([ 0.01901355,  0.66206921,  1.87768719, -0.01015083, -1.08162041,
       -1.46849039,  0.06769722,  1.09600794,  0.77648112,  0.61039477,
       -0.01970515,  0.11000903, -0.16037077, -1.09661923,  1.32855895,
       -1.2368685 ,  1.20337588, -1.33260167, -0.51772706, -0.39698925,
       -0.39718755,  0.12036192, -1.08050579,  0.29169133])

更进一步,我们直接看向量对item的召回结果,能不能把互动过的排在前面

>>> user_factors = {}
>>> user_factors["als"] = compute_user_factors(user_ratings_list, item_factors_dict, YtY, rank=24, alpha=500, regParam=0.01)
>>> user_factors["avg"] = compute_user_factors_avg(user_ratings_list, item_factors_dict, rank=24)

计算用户向量和物品向量的内积,

>>> user_item_scores = {}
>>> user_item_scores["als"] = {item_code: np.dot(item_factors_dict[item_code], user_factors["als"]) for item_code in item_factors_dict.keys()}
>>> user_item_scores["avg"] = {item_code: np.dot(item_factors_dict[item_code], user_factors["avg"]) for item_code in item_factors_dict.keys()}
>>> user_item_scores["als"]
{1: 0.905902111155681,
  2: 0.7807423250756385,
  3: 0.7112349183189641,
  4: 0.9707872645534267,
  5: 0.962365519610098,
  6: 1.0482348491654447,
  7: 1.0664618361467029,
  8: 0.89448296438726,
  9: 0.9782683066408783,
 ...
  1120: 0.19403266496754024,
  1121: 0.013655477921710242,
  1122: 0.02746869119299521,
  1125: 0.14710482464524885,
...}

使用AUC和NGCG衡量召回效果,

from sklearn.metrics import roc_auc_score, ndcg_score

def compute_auc(user_item_scores, user_ratings_list, threshold=0.5, topk=1000):
  positive_items = set([item_code for item_code, weight in user_ratings_list if weight > threshold])
  y_true = np.array([1 if item_code in positive_items else 0 for item_code in sorted(user_item_scores.keys())])
  y_scores = np.array([x[1] for x in sorted(user_item_scores.items(), key=lambda x: x[0])])
  return roc_auc_score(y_true[:topk], y_scores[:topk])

def compute_ndcg(user_item_scores, user_ratings_list):
  positive_items = set([item_code for item_code, weight in user_ratings_list if weight > 0])
  y_true = np.array([1 if item_code in positive_items else 0 for item_code in sorted(user_item_scores.keys())])
  y_scores = np.array([x[1] for x in sorted(user_item_scores.items(), key=lambda x: x[0])])
  return ndcg_score([y_true], [y_scores])

for method in ["als", "avg"]:
  print(f"Method {method}:", \
        "AUC =", compute_auc(user_item_scores[method], user_ratings_list), \
        "NDCG=", compute_ndcg(user_item_scores[method], user_ratings_list))

效果对比

随机测试不同用户,对应不同的点击序列长度,

# length = 10, user_code = 150812886
Method als: AUC = 0.995995995995996 NDCG= 0.4144082736048081
Method avg: AUC = 0.993993993993994 NDCG= 0.608448740547803
# length = 30, user_code = 238273242
Method als: AUC = 0.9728643216080403 NDCG= 0.7903073580861308
Method avg: AUC = 0.9694472361809046 NDCG= 0.8106331194192624
# length = 50, user_code = 170100056
Method als: AUC = 0.9974899598393574 NDCG= 0.8167601868987228
Method avg: AUC = 0.9944779116465864 NDCG= 0.8004696585332196
# length = 58, user_code = 1175688
Method als: AUC = 0.998998998998999 NDCG= 0.8357972978136222
Method avg: AUC = 0.994994994994995 NDCG= 0.8201008728129268
# length = 92, user_code = 1175655
Method als: AUC = 0.9825620389000671 NDCG= 0.7287425744703396
Method avg: AUC = 0.9787055667337357 NDCG= 0.7425987640451196
# length = 99, user_code = 1175861
Method als: AUC = 0.98582372453009 NDCG= 0.8999322337615655
Method avg: AUC = 0.9718448902227136 NDCG= 0.8376076440967868
# length = 100, user_code = 257167813
Method als: AUC = 0.978099914270127 NDCG= 0.8152207296087474
Method avg: AUC = 0.9627464733847713 NDCG= 0.8140765430415308
# length = 111, user_code = 14547221
Method als: AUC = 0.9045047031158143 NDCG= 0.7504971409072382
Method avg: AUC = 0.9173647854203411 NDCG= 0.8386996059188151
# length = 116, user_code = 1175641
Method als: AUC = 0.9844209288653734 NDCG= 0.9126388498716672
Method avg: AUC = 0.981848912404468 NDCG= 0.885638097443387
# length = 147, user_code = 1175654
Method als: AUC = 0.9363242210464433 NDCG= 0.8776266091078719
Method avg: AUC = 0.9240887713109935 NDCG= 0.877695144038602
# length = 150, user_code = 110551252
Method als: AUC = 0.9880358388325554 NDCG= 0.8479037695140659
Method avg: AUC = 0.9690970545630131 NDCG= 0.8638332538486885
# length = 157, user_code = 1175700
Method als: AUC = 0.9286604440601237 NDCG= 0.8693871584800122
Method avg: AUC = 0.9138838729367481 NDCG= 0.8575589074997558
# length = 200, user_code = 227412362
Method als: AUC = 0.9237138830162086 NDCG= 0.8957874068824522
Method avg: AUC = 0.9150418917860779 NDCG= 0.8721873058903625
# length = 300, user_code = 217716448
Method als: AUC = 0.9623493975903614 NDCG= 0.8330069094473749
Method avg: AUC = 0.9352409638554217 NDCG= 0.8028611211046232

大致抽样这些,能看到ALS比平均向量 略好,但差距不大,均足够区分点击的物品和未点击的物品。

结论:平均池化够了

线上的应用可以直接加载ALS训练出的itemFactors,使用平均池化,能够快速输出所有的用户向量。

4、随机序列

上面的效果让我对Average Pooling处理序列的能力有了改观,以前认为这个方式一定是牺牲了很多信息,但其实还原度也还挺高了。侧面印证了,在计算兴趣模型的时候,将用户的兴趣标签EmbeddingBag直接.mean(),然后接MLP层的合理性。

但是,当我开始用随机生成的user_ratings_list,而不是真实用户的数据去评估还原效果的时候,情况变得有些不一样。

随机序列还原失效

user_ratings_list = [(i, 1) for i in range(30000) if random.random() > 0.999]

这个过程模拟一个随机游走的用户,点点这里,点点那里。调整items集合大小和抽样概率,可以构造出长短不同的随机互动序列,

# length = 13, size = 2000
Method als: AUC = 0.5865007287812535 NDCG= 0.27488864679703884
Method avg: AUC = 0.6422244646260791 NDCG= 0.37924960794552126
# length = 23, size = 10000
Method als: AUC = 0.8558558558558559 NDCG= 0.25959571199327536
Method avg: AUC = 0.975975975975976 NDCG= 0.2651492202387803
# length = 25, size = 10000
Method als: AUC = 0.5831663326653307 NDCG= 0.26425784007786246
Method avg: AUC = 0.6953907815631263 NDCG= 0.2802458261479761
# length = 25, size = 10000
Method als: AUC = 0.630558341691742 NDCG= 0.25840986129021026
Method avg: AUC = 0.5359411568037444 NDCG= 0.26434156330992997
# length = 26, size = 10000
Method als: AUC = 0.9019019019019019 NDCG= 0.2718959506767607
Method avg: AUC = 0.9099099099099099 NDCG= 0.28101336423854395
# length = 31, size = 10000
Method als: AUC = 0.8281511200267468 NDCG= 0.2744436643358278
Method avg: AUC = 0.8000668672684721 NDCG= 0.28238772597296335
# length = 34, size = 20000
Method als: AUC = 0.7990981963927857 NDCG= 0.2821654719607691
Method avg: AUC = 0.8156312625250501 NDCG= 0.28426755625658356
# length = 43, size = 20000
Method als: AUC = 0.6907388833166166 NDCG= 0.285178843906648
Method avg: AUC = 0.6974256101638249 NDCG= 0.2885199415839827
# length = 50, size = 20000
Method als: AUC = 0.6268686868686868 NDCG= 0.3620008834033333
Method avg: AUC = 0.7128282828282828 NDCG= 0.37804301617334146
# length = 100, size = 20000
Method als: AUC = 0.39839034205231383 NDCG= 0.3964609304394006
Method avg: AUC = 0.45875251509054327 NDCG= 0.42364949676832636
# length = 208, size = 20000
Method als: AUC = 0.5538737526628545 NDCG= 0.4470703864640777
Method avg: AUC = 0.5671039354187689 NDCG= 0.45948063867674394

现象是,

  • 当序列长度增加到50+,ALS和平均池化向量均失去区分item的能力,AUC=0.5
  • 序列长度小于50的时候,随着random抽样结果不同,AUC在0.5和0.9之间波动,但即使序列很短,比如10个,效果也很差
  • 还原效果上,随机生成序列显著低于用户真实序列

什么原因?

物品向量在embedding空间是分散状的,当我们随机抽样点时,计算AveragePooling的均值向量位置与随机分布的这些抽样点的夹角有大有小,没法与大部分点共线,因此失去区分度,AUC->0.5。ALS相当于在原始Embedding空间上做一个线性初等变换(旋转缩放),再对随机点做AveragePooling,所以结论也类似。

在推荐系统中,还是会遇到比较随机/兴趣点发散的用户的,毕竟要做 千人千面,即使发现了item之间的相关性,也不能因为一个用户点了10个看起来无关的物品,就把这个人的向量学成一个均值。

事实上,如果必须在Embedding空间/或线性变换后的空间上搞,userFactor肯定无非平凡解(不存在与一批正交向量都共线的向量)。所以关键在于,能否在Embedding空间上做一个 非线性变换 ,把这些相对分散的点聚合到一条线上,这样userFactor就理论上就有解了。

非线性变化

尝试Transformer?

一个可行的方向是使用Transformer,但注意,不是 在他的Embedding层上直接AveragePooling(如图左),而是 使用attention后的输出层的embedding(这个才是非线性变化后的空间,如图右),作为 context-awareitemFactors,进行AveragePooling的向量作为userFactor,应该就可以了。

这只是一个推论,具体还没有尝试。

5、总结

ALS可以改造成inductive的方法,在训练阶段,重点学习全量物品向量,在预估阶段,加载用户点击过的物品向量,进行平均池化,作为用户向量即可。

当然,这是在物品相对固定的情况下。如果物品向量需要更新的话,还是得复现单步的ALS更新,但公式也在文中给出初步复现,这样就可以持续转下去了。

此外,ALS方法对于兴趣点分散的用户,可能会退化为平凡解,无法精确还原点击序列,可能需要非线性的模型才能更好建模。

6、后续更新:线上应用

在线上应用时发现,用户向量的计算是可行的,但物品向量的更新却比较麻烦。

原因是,用户互动过的物品列表长度最多大几百个,但一个物品的互动用户列表,长度就可能有几百万(因为用户集合量级很大,有几十亿)。

为什么Spark ALS能处理长列表

因为在里面有个 Block机制,就是把用户和物品都Hash到不同的块中,比如,50个用户Block,10个物品Block。对于一个物品来说,可以获得50个列表,每个列表可以生成对应的AtA和Atb矩阵,然后把这50个矩阵汇总相加,进行法方程的求解。Map Reduce!

这个逻辑用python模拟如下,

import numpy as np
import random
from collections import defaultdict

# --- 复用之前的求解器 ---
def solve_normal_equation(ata, atb, regParam, rank):
  """ 使用 Cholesky 分解求解正规方程 (物品因子) """
  try:
    # L2 正则化项: lambda * I (I 是单位矩阵)
    ata_reg = ata + np.eye(rank) * regParam
    L = np.linalg.cholesky(ata_reg)
    L_inv = np.linalg.inv(L)
    item_factors = np.dot(np.dot(L_inv.T, L_inv), atb)
  except np.linalg.LinAlgError:
    print(f"警告: 求解物品因子时矩阵奇异,返回零向量。")
    item_factors = np.zeros(rank)
  return item_factors

# --- 模拟计算物品因子 ---
def compute_item_factor_simulated_blocks(
    item_ratings_list,  # [(user_code, rating), ...] for this item
    user_factors_dict,  # {user_code: factor_array, ...}
    UtU,                # Precomputed User Gramian Matrix (rank x rank), can be None if not implicit
    rank=24,
    alpha=500,
    regParam=0.01,
    num_user_blocks=10, # 模拟的用户块数量
    implicit=True       # 是否是 implicit 模式
    ):
    """
    模拟使用用户分块来计算单个物品的因子。
    注意:这是一个模拟,所有计算仍在单机进行。
    """

    # --- 1. 初始化 N 个块的累加器 ---
    # 每个累加器存储 (ata_partial, atb_partial)
    # 使用 defaultdict 可以方便地处理空块
    block_accumulators = defaultdict(lambda: (np.zeros((rank, rank)), np.zeros(rank)))

    # --- 2. 模拟将用户分散到不同块并计算贡献 ---
    print(f"模拟将 {len(item_ratings_list)} 个用户评分分散到 {num_user_blocks} 个块...")
    for user_code, rating in item_ratings_list:
        user_factor = np.array(user_factors_dict.get(user_code, np.zeros(rank))) # 获取用户因子

        # 计算这个用户对 Ata 和 Atb 的贡献 delta_ata, delta_atb
        delta_ata = np.zeros((rank, rank))
        delta_atb = np.zeros(rank)

        if implicit:
            if rating > 0: # implicit 模式下,通常只考虑正反馈,但 ALS-WR 论文允许负反馈影响置信度
                c_ui = alpha * abs(rating) # 置信度 confidence
                p_ui = 1.0 if rating > 0 else 0.0 # 偏好 preference
                delta_ata = np.outer(user_factor, user_factor) * c_ui
                delta_atb = user_factor * c_ui * p_ui   
        else: # Explicit 模式
            delta_ata = np.outer(user_factor, user_factor)
            delta_atb = user_factor * rating 

        # 确定用户所属的块
        block_id = user_code % num_user_blocks

        # 将贡献累加到对应的块累加器
        current_ata, current_atb = block_accumulators[block_id]
        block_accumulators[block_id] = (current_ata + delta_ata, current_atb + delta_atb)

    # --- 3. 模拟汇总来自所有块的结果 ---
    print("模拟汇总各块的计算结果...")
    Ata_total = np.zeros((rank, rank))
    Atb_total = np.zeros(rank)

    num_processed_blocks = 0
    for block_id in range(num_user_blocks):
        if block_id in block_accumulators: # 仅处理有数据的块
            ata_partial, atb_partial = block_accumulators[block_id]
            Ata_total += ata_partial
            Atb_total += atb_partial
            num_processed_blocks += 1

    print(f"汇总了 {num_processed_blocks} 个块的 Ata ({Ata_total.shape}) 和 Atb ({Atb_total.shape})。")

    # --- 4. 求解最终方程 ---
    print("求解最终的正规方程...")
    if implicit:
        final_ata = np.zeros((rank, rank))
        if UtU is not None:
            final_ata += UtU # UtU = sum_j u_j u_j^T
        final_atb = np.zeros(rank)
        for user_code, rating in item_ratings_list:
            user_factor = np.array(user_factors_dict.get(user_code, np.zeros(rank)))
            if rating > 0: # 只考虑正反馈计算贡献
                 c_ui = alpha * abs(rating) # 或者 alpha * rating
                 p_ui = 1.0
                 final_ata += np.outer(user_factor, user_factor) * c_ui
                 final_atb += user_factor * c_ui * p_ui

        updated_item_factor = solve_normal_equation(final_ata, final_atb, regParam, rank)

    else: # Explicit
        updated_item_factor = solve_normal_equation(Ata_total, Atb_total, regParam, rank)


    return updated_item_factor

# --- 准备模拟数据 ---
RANK = 5
NUM_USERS = 10000
NUM_ITEMS = 100
NUM_BLOCKS = 10 # 模拟用户块数

# 1. 模拟用户因子
user_factors = {i: np.random.rand(RANK).astype(np.float32) for i in range(NUM_USERS)}

# 2. 模拟目标物品的评分列表 (假设 item 0 被很多用户评过)
target_item = 0
# item_ratings = [(random.randint(0, NUM_USERS - 1), random.uniform(0.1, 1.0)) for _ in range(50000)] # 5万次评分
item_ratings = []
for u in range(NUM_USERS):
    if random.random() < 0.8: # 80% 的用户评价过 item 0
        item_ratings.append((u, random.uniform(0.1, 1.0)))
print(f"物品 {target_item}{len(item_ratings)} 个评分。")


# 3. 预计算 UtU (仅 implicit 需要)
precomputed_UtU = None
IS_IMPLICIT = True # 设置为 True 或 False
if IS_IMPLICIT:
    print("预计算 UtU...")
    all_user_factors_array = np.array(list(user_factors.values()))
    precomputed_UtU = np.dot(all_user_factors_array.T, all_user_factors_array)
    print("UtU 计算完成。")

# --- 执行模拟 ---
updated_factor = compute_item_factor_simulated_blocks(
    item_ratings_list=item_ratings,
    user_factors_dict=user_factors,
    UtU=precomputed_UtU, # 传入预计算的 UtU
    rank=RANK,
    num_user_blocks=NUM_BLOCKS,
    implicit=IS_IMPLICIT,
    regParam=0.1,
    alpha=10 # implicit alpha
)

print(f"\n物品 {target_item} 更新后的因子向量 (模拟 {NUM_BLOCKS} 块):")
print(updated_factor)

最终的方案:基于源码扩展

在Scala源码ALS的基础上进行继承,在保留原接口的基础上,增加了单步更新的静态方法,类名为 org.apache.spark.ml.recommendation.IncrementalALS

虽然此实现标准的 fit() 方法行为与原始 Spark ALS 一致(从头开始训练),但其主要增强在于提供了 单步更新能力。它通过 PySpark 包装器提供了可访问的静态方法 (stepUser, stepItem),允许用户使用预先存在的另一方因子来执行用户因子或物品因子的单次 ALS 更新迭代。这有助于实现 在线更新微调 等场景,在这些场景中,只需要根据新的交互数据更新一组因子,同时保持与先前状态的向量连续性。

该实现被设计为一个独立的扩展,需要用户在其 Spark 环境中包含编译后的 JAR 文件和可能的 Python 包装器脚本。

版权声明

以上文章为本人@迪吉老农原创,文责自负。文中如有引用他人内容的部分(包括文字或图片),均已明文指出,或做出明确的引用标记。如需转载,请联系作者,并取得作者的明示同意。感谢。

Comments