Spark ALS 增量训练扩展 (Spark ALS Incremental Training Extension)¶
项目概述¶
本项目是对 Apache Spark MLlib 中 ALS (Alternating Least Squares) 推荐算法的一个扩展实现 (org.apache.spark.ml.recommendation.IncrementalALS
)。其核心目标是支持增量训练,允许用户加载先前训练好的 ALS 模型(用户和物品因子),并在这些已有因子的基础上,利用新的数据继续进行训练,而不是每次都从随机初始化开始。
这个实现被封装在一个独立的项目中,包含核心的 Scala 实现和一个可选的 PySpark 封装脚本,便于在不同环境中使用。
核心特性¶
- 增量训练: 支持从已有的 User/Item Factors DataFrame 开始训练。
- 独立部署: Scala 核心作为外部 JAR 包使用,无需修改 Spark 自身代码。
- PySpark 封装: 提供 Python 封装类 (
IncrementalALS
),继承自pyspark.ml.recommendation.ALS
,提供与原生 ALS 相似的 API 体验。
更多信息请参考源码:spark-incremental-als
这个notebook用来测试模块功能
In [1]:
Copied!
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.sql.functions import col, udf, expr, collect_set, lit
from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator
import pandas as pd
import numpy as np
import random
import argparse
import faiss
from typing import Tuple
from loguru import logger
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.sql.functions import col, udf, expr, collect_set, lit
from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator
import pandas as pd
import numpy as np
import random
import argparse
import faiss
from typing import Tuple
from loguru import logger
In [2]:
Copied!
__doc__ = """
ALS增量更新算法
"""
__doc__ = """
ALS增量更新算法
"""
In [3]:
Copied!
# 配置Pandas显示选项
pd.options.display.max_colwidth = 500
# 配置Pandas显示选项
pd.options.display.max_colwidth = 500
In [4]:
Copied!
def in_notebook():
"""
检查当前代码是否在Jupyter Notebook中运行
"""
try:
return get_ipython().__class__.__name__ == 'ZMQInteractiveShell'
except NameError:
return False
def get_spark():
"""
获取Spark会话,如果不存在则创建一个
"""
if 'spark' not in locals():
spark = SparkSession.builder \
.appName("ALS") \
.config("spark.sql.catalogImplementation", "hive") \
.enableHiveSupport() \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
return spark
def split_dataset(dataset, train_test_split=0.9, seed=123, split_by_user=True, add_negative_samples=False, neg_sample_ratio=1.0):
"""
分割数据集为训练集和测试集,并根据需要添加负样本
"""
def add_random_negative_samples(df, all_tags, ratio=1.0):
"""为每个用户生成随机负样本"""
def add_negative_samples(row):
user_code = row['user_code']
positive_tags = set(row['positive_tags'])
negative_tags = random.sample(list(all_tags.value - positive_tags), int(len(positive_tags) * ratio))
return [(user_code, tag_code, 0.0) for tag_code in negative_tags]
partitions = df.rdd.getNumPartitions()
df_grouped = df.groupby('user_code').agg(collect_set('tag_code').alias('positive_tags')).repartition(partitions)
rdd_neg = df_grouped.rdd.flatMap(add_negative_samples)
return spark.createDataFrame(rdd_neg, ['user_code', 'tag_code', 'weight'])
# 分割数据集
train_data, test_data = dataset.select("user_code", "tag_code", "weight") \
.randomSplit([train_test_split, 1 - train_test_split], seed=seed)
# 按用户分割(可选)
if split_by_user:
dividend = int(1.0 / (1 - train_test_split))
test_data = test_data.where(f"user_code % {dividend} = 1")
# 添加测试集负样本
all_tags = dataset.select('tag_code').distinct().rdd.map(lambda row: row['tag_code']).collect()
all_tags = spark.sparkContext.broadcast(set(all_tags))
logger.info(f"数据中总标签数:{len(all_tags.value)}")
test_data_neg = add_random_negative_samples(test_data, all_tags, ratio=neg_sample_ratio)
test_data = test_data.union(test_data_neg)
# 添加训练集负样本(可选)
if add_negative_samples:
train_data_neg = add_random_negative_samples(train_data, all_tags, ratio=neg_sample_ratio)
train_data = train_data.union(train_data_neg)
return train_data, test_data
def evaluate(model, data, threshold=0.75):
""" 评估模型性能,计算RMSE和AUC """
pred = model.transform(data).cache()
pred = pred.withColumn("prediction", col("prediction").cast("double"))
pred = pred.withColumn("label", expr(f"if(weight > {threshold}, 1.0, 0.0)"))
metrics = {}
evaluator = RegressionEvaluator(metricName="rmse", labelCol="weight", predictionCol="prediction")
metrics["rmse"] = evaluator.evaluate(pred)
logger.info("Root-mean-square error = " + str(metrics["rmse"]))
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="label")
metrics["AUC"] = evaluator.evaluate(pred)
logger.info("AUC = " + str(metrics["AUC"]))
pred.unpersist()
return metrics
def prepare_item_features(model: ALSModel) -> pd.DataFrame:
""" 获取模型的物品特征矩阵,并与标签信息关联 """
model.itemFactors.createOrReplaceTempView("item_factors")
item_feature = spark.sql("""
SELECT
a.tag_id AS tag_id,
a.code AS tag_code,
a.tag_name AS tag_name,
b.features
FROM yandi_bigdata_vf_tag_dim a
JOIN item_factors b ON a.code = b.id
WHERE dt='working' AND pt='obj'
""")
return item_feature.toPandas()
def normalize_features(item_feature_df: pd.DataFrame) -> Tuple[pd.DataFrame, np.ndarray]:
""" 物品特征矩阵归一化 """
feature_matrix = np.stack(item_feature_df["features"])
logger.info("Feature matrix shape: {}", feature_matrix.shape)
normalized_matrix = feature_matrix / np.linalg.norm(feature_matrix, axis=1, keepdims=True)
item_feature_df["features_norm"] = [
",".join(f"{value:.5f}" for value in row) for row in normalized_matrix
]
return item_feature_df, normalized_matrix
def create_faiss_index(index_type: str, rank: int) -> faiss.Index:
index_map = {
"IP": faiss.IndexFlatIP,
"L2": faiss.IndexFlatL2
}
index_class = index_map.get(index_type)
if not index_class:
raise ValueError(f"Invalid faiss_index_type: {index_type}")
return index_class(rank)
def compute_similarity(index: faiss.Index, feature_matrix: np.ndarray, topk_items: int) -> Tuple[np.ndarray, np.ndarray]:
index.add(feature_matrix)
D, I = index.search(feature_matrix, topk_items)
return D, I
def add_topk_related(item_feature_df: pd.DataFrame, I: np.ndarray, D: np.ndarray) -> pd.DataFrame:
""" 计算每个物品的TopK相似物品 """
tag_name_dict = item_feature_df["tag_name"].to_dict()
def compute_topk_related(id_: int):
try:
return "|".join([
f"{tag_name_dict[I[id_][k]]}:{D[id_][k]:.1f}" for k in range(I.shape[1])
])
except Exception as e:
logger.error(e)
return ""
item_feature_df["topk_related"] = [compute_topk_related(id_) for id_ in range(len(item_feature_df))]
return item_feature_df
def log_matches(tags: list, item_feature_df: pd.DataFrame) -> None:
""" 打印标签的相似物品 """
for tag_name in tags:
matched = item_feature_df.loc[item_feature_df.tag_name == tag_name]
for row in matched.itertuples():
logger.info(f"{tag_name} -> {row.tag_id} -> {row.topk_related}")
def save_to_database(item_feature_df: pd.DataFrame, args) -> None:
df = spark.createDataFrame(item_feature_df)
df.createOrReplaceTempView("item_feature_df")
table = "bigdata_vf_als_tag_encoding"
spark.sql(f"""
INSERT OVERWRITE TABLE {table} PARTITION(dt='{args.dataset_dt}', pt='{args.dataset_pt}')
SELECT tag_id, tag_code, tag_name, features_norm, topk_related
FROM item_feature_df
""")
logger.info(f"向量化标签特征: {table} partition(dt={args.dataset_dt}, pt={args.dataset_pt})")
def in_notebook():
"""
检查当前代码是否在Jupyter Notebook中运行
"""
try:
return get_ipython().__class__.__name__ == 'ZMQInteractiveShell'
except NameError:
return False
def get_spark():
"""
获取Spark会话,如果不存在则创建一个
"""
if 'spark' not in locals():
spark = SparkSession.builder \
.appName("ALS") \
.config("spark.sql.catalogImplementation", "hive") \
.enableHiveSupport() \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
return spark
def split_dataset(dataset, train_test_split=0.9, seed=123, split_by_user=True, add_negative_samples=False, neg_sample_ratio=1.0):
"""
分割数据集为训练集和测试集,并根据需要添加负样本
"""
def add_random_negative_samples(df, all_tags, ratio=1.0):
"""为每个用户生成随机负样本"""
def add_negative_samples(row):
user_code = row['user_code']
positive_tags = set(row['positive_tags'])
negative_tags = random.sample(list(all_tags.value - positive_tags), int(len(positive_tags) * ratio))
return [(user_code, tag_code, 0.0) for tag_code in negative_tags]
partitions = df.rdd.getNumPartitions()
df_grouped = df.groupby('user_code').agg(collect_set('tag_code').alias('positive_tags')).repartition(partitions)
rdd_neg = df_grouped.rdd.flatMap(add_negative_samples)
return spark.createDataFrame(rdd_neg, ['user_code', 'tag_code', 'weight'])
# 分割数据集
train_data, test_data = dataset.select("user_code", "tag_code", "weight") \
.randomSplit([train_test_split, 1 - train_test_split], seed=seed)
# 按用户分割(可选)
if split_by_user:
dividend = int(1.0 / (1 - train_test_split))
test_data = test_data.where(f"user_code % {dividend} = 1")
# 添加测试集负样本
all_tags = dataset.select('tag_code').distinct().rdd.map(lambda row: row['tag_code']).collect()
all_tags = spark.sparkContext.broadcast(set(all_tags))
logger.info(f"数据中总标签数:{len(all_tags.value)}")
test_data_neg = add_random_negative_samples(test_data, all_tags, ratio=neg_sample_ratio)
test_data = test_data.union(test_data_neg)
# 添加训练集负样本(可选)
if add_negative_samples:
train_data_neg = add_random_negative_samples(train_data, all_tags, ratio=neg_sample_ratio)
train_data = train_data.union(train_data_neg)
return train_data, test_data
def evaluate(model, data, threshold=0.75):
""" 评估模型性能,计算RMSE和AUC """
pred = model.transform(data).cache()
pred = pred.withColumn("prediction", col("prediction").cast("double"))
pred = pred.withColumn("label", expr(f"if(weight > {threshold}, 1.0, 0.0)"))
metrics = {}
evaluator = RegressionEvaluator(metricName="rmse", labelCol="weight", predictionCol="prediction")
metrics["rmse"] = evaluator.evaluate(pred)
logger.info("Root-mean-square error = " + str(metrics["rmse"]))
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="label")
metrics["AUC"] = evaluator.evaluate(pred)
logger.info("AUC = " + str(metrics["AUC"]))
pred.unpersist()
return metrics
def prepare_item_features(model: ALSModel) -> pd.DataFrame:
""" 获取模型的物品特征矩阵,并与标签信息关联 """
model.itemFactors.createOrReplaceTempView("item_factors")
item_feature = spark.sql("""
SELECT
a.tag_id AS tag_id,
a.code AS tag_code,
a.tag_name AS tag_name,
b.features
FROM yandi_bigdata_vf_tag_dim a
JOIN item_factors b ON a.code = b.id
WHERE dt='working' AND pt='obj'
""")
return item_feature.toPandas()
def normalize_features(item_feature_df: pd.DataFrame) -> Tuple[pd.DataFrame, np.ndarray]:
""" 物品特征矩阵归一化 """
feature_matrix = np.stack(item_feature_df["features"])
logger.info("Feature matrix shape: {}", feature_matrix.shape)
normalized_matrix = feature_matrix / np.linalg.norm(feature_matrix, axis=1, keepdims=True)
item_feature_df["features_norm"] = [
",".join(f"{value:.5f}" for value in row) for row in normalized_matrix
]
return item_feature_df, normalized_matrix
def create_faiss_index(index_type: str, rank: int) -> faiss.Index:
index_map = {
"IP": faiss.IndexFlatIP,
"L2": faiss.IndexFlatL2
}
index_class = index_map.get(index_type)
if not index_class:
raise ValueError(f"Invalid faiss_index_type: {index_type}")
return index_class(rank)
def compute_similarity(index: faiss.Index, feature_matrix: np.ndarray, topk_items: int) -> Tuple[np.ndarray, np.ndarray]:
index.add(feature_matrix)
D, I = index.search(feature_matrix, topk_items)
return D, I
def add_topk_related(item_feature_df: pd.DataFrame, I: np.ndarray, D: np.ndarray) -> pd.DataFrame:
""" 计算每个物品的TopK相似物品 """
tag_name_dict = item_feature_df["tag_name"].to_dict()
def compute_topk_related(id_: int):
try:
return "|".join([
f"{tag_name_dict[I[id_][k]]}:{D[id_][k]:.1f}" for k in range(I.shape[1])
])
except Exception as e:
logger.error(e)
return ""
item_feature_df["topk_related"] = [compute_topk_related(id_) for id_ in range(len(item_feature_df))]
return item_feature_df
def log_matches(tags: list, item_feature_df: pd.DataFrame) -> None:
""" 打印标签的相似物品 """
for tag_name in tags:
matched = item_feature_df.loc[item_feature_df.tag_name == tag_name]
for row in matched.itertuples():
logger.info(f"{tag_name} -> {row.tag_id} -> {row.topk_related}")
def save_to_database(item_feature_df: pd.DataFrame, args) -> None:
df = spark.createDataFrame(item_feature_df)
df.createOrReplaceTempView("item_feature_df")
table = "bigdata_vf_als_tag_encoding"
spark.sql(f"""
INSERT OVERWRITE TABLE {table} PARTITION(dt='{args.dataset_dt}', pt='{args.dataset_pt}')
SELECT tag_id, tag_code, tag_name, features_norm, topk_related
FROM item_feature_df
""")
logger.info(f"向量化标签特征: {table} partition(dt={args.dataset_dt}, pt={args.dataset_pt})")
In [5]:
Copied!
# ===============================
# 主函数,执行ALS模型训练和评估"
# ===============================
class Args:
seed = 123
dataset = "bigdata_vf_als_user_tag_tuple_test"
dataset_dt = "20250306"
dataset_pt = "long_obj"
dataset_substr = "5"
maxIter = 10
regParam = 0.01
rank = 24
implicitPrefs = True
alpha = 500
train_test_split = 0.9
als_blocks = 10
split_by_user = True
add_negative_samples = False
neg_sample_ratio = 1.0
transform = "power4"
topk_items = 50
faiss_index_type = "IP"
args = Args()
if not in_notebook():
example = """
# 示例
python train.py --seed 123 --dataset bigdata_vf_als_user_tag_tuple --dataset_dt 20241015 --dataset_pt long_obj --maxIter 10 --regParam 0.01 --rank 10 --implicitPrefs --alpha 200 --train_test_split 0.9 --split_by_user --no_add_negative_samples --neg_sample_ratio 1.0 --transform power4 --topk_items 50 --faiss_index_type IP
"""
parser = argparse.ArgumentParser(description=__doc__, epilog=example, formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("--seed", type=int, default=args.seed, help="随机种子")
# dataset group
group = parser.add_argument_group("dataset")
group.add_argument("--dataset", type=str, default=args.dataset, help="数据集表名")
group.add_argument("--dataset_dt", type=str, default=args.dataset_dt, help="数据集日期字段名")
group.add_argument("--dataset_pt", type=str, default=args.dataset_pt, help="数据集分区字段名")
group.add_argument("--dataset_substr", type=str, default=None, help="数据集分区子串,逗号分隔,例如1,4")
# model group
group = parser.add_argument_group("model")
group.add_argument("--maxIter", type=int, default=args.maxIter, help="最大迭代次数")
group.add_argument("--regParam", type=float, default=args.regParam, help="正则化参数")
group.add_argument("--rank", type=int, default=args.rank, help="潜在因子数")
group.add_argument("--alpha", type=float, default=args.alpha, help="ALS模型参数alpha")
group.add_argument("--implicitPrefs", action="store_true", default=args.implicitPrefs, help="是否使用隐式反馈")
group.add_argument("--no_implicitPrefs", dest="implicitPrefs", action="store_false", help="不使用隐式反馈")
group.add_argument("--transform", choices=["power5", "power4", "power3", "power2", "logit", "none"], default=args.transform, help="评分数据转换")
group.add_argument("--als_blocks", type=int, default=20, help="ALS计算的并行度")
# split group
group = parser.add_argument_group("split")
group.add_argument("--train_test_split", type=float, default=args.train_test_split, help="训练集和测试集的分割比例")
group.add_argument("--neg_sample_ratio", type=float, default=args.neg_sample_ratio, help="负样本比例")
group.add_argument("--split_by_user", action="store_true", default=args.split_by_user, help="是否按用户分割数据集")
group.add_argument("--add_negative_samples", action="store_true", default=args.add_negative_samples, help="是否添加训练集负样本")
group.add_argument("--no_add_negative_samples", dest="add_negative_samples", action="store_false", help="不添加训练集负样本")
# faiss group
group = parser.add_argument_group("faiss")
group.add_argument("--topk_items", type=int, default=args.topk_items, help="TopK相似物品数")
group.add_argument("--faiss_index_type", choices=["IP", "L2"], default="IP", help="faiss索引类型")
args = parser.parse_args()
if not args.dataset_substr:
args.dataset_substr = ",".join(str(i) for i in range(10))
# ===============================
# 主函数,执行ALS模型训练和评估"
# ===============================
class Args:
seed = 123
dataset = "bigdata_vf_als_user_tag_tuple_test"
dataset_dt = "20250306"
dataset_pt = "long_obj"
dataset_substr = "5"
maxIter = 10
regParam = 0.01
rank = 24
implicitPrefs = True
alpha = 500
train_test_split = 0.9
als_blocks = 10
split_by_user = True
add_negative_samples = False
neg_sample_ratio = 1.0
transform = "power4"
topk_items = 50
faiss_index_type = "IP"
args = Args()
if not in_notebook():
example = """
# 示例
python train.py --seed 123 --dataset bigdata_vf_als_user_tag_tuple --dataset_dt 20241015 --dataset_pt long_obj --maxIter 10 --regParam 0.01 --rank 10 --implicitPrefs --alpha 200 --train_test_split 0.9 --split_by_user --no_add_negative_samples --neg_sample_ratio 1.0 --transform power4 --topk_items 50 --faiss_index_type IP
"""
parser = argparse.ArgumentParser(description=__doc__, epilog=example, formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("--seed", type=int, default=args.seed, help="随机种子")
# dataset group
group = parser.add_argument_group("dataset")
group.add_argument("--dataset", type=str, default=args.dataset, help="数据集表名")
group.add_argument("--dataset_dt", type=str, default=args.dataset_dt, help="数据集日期字段名")
group.add_argument("--dataset_pt", type=str, default=args.dataset_pt, help="数据集分区字段名")
group.add_argument("--dataset_substr", type=str, default=None, help="数据集分区子串,逗号分隔,例如1,4")
# model group
group = parser.add_argument_group("model")
group.add_argument("--maxIter", type=int, default=args.maxIter, help="最大迭代次数")
group.add_argument("--regParam", type=float, default=args.regParam, help="正则化参数")
group.add_argument("--rank", type=int, default=args.rank, help="潜在因子数")
group.add_argument("--alpha", type=float, default=args.alpha, help="ALS模型参数alpha")
group.add_argument("--implicitPrefs", action="store_true", default=args.implicitPrefs, help="是否使用隐式反馈")
group.add_argument("--no_implicitPrefs", dest="implicitPrefs", action="store_false", help="不使用隐式反馈")
group.add_argument("--transform", choices=["power5", "power4", "power3", "power2", "logit", "none"], default=args.transform, help="评分数据转换")
group.add_argument("--als_blocks", type=int, default=20, help="ALS计算的并行度")
# split group
group = parser.add_argument_group("split")
group.add_argument("--train_test_split", type=float, default=args.train_test_split, help="训练集和测试集的分割比例")
group.add_argument("--neg_sample_ratio", type=float, default=args.neg_sample_ratio, help="负样本比例")
group.add_argument("--split_by_user", action="store_true", default=args.split_by_user, help="是否按用户分割数据集")
group.add_argument("--add_negative_samples", action="store_true", default=args.add_negative_samples, help="是否添加训练集负样本")
group.add_argument("--no_add_negative_samples", dest="add_negative_samples", action="store_false", help="不添加训练集负样本")
# faiss group
group = parser.add_argument_group("faiss")
group.add_argument("--topk_items", type=int, default=args.topk_items, help="TopK相似物品数")
group.add_argument("--faiss_index_type", choices=["IP", "L2"], default="IP", help="faiss索引类型")
args = parser.parse_args()
if not args.dataset_substr:
args.dataset_substr = ",".join(str(i) for i in range(10))
In [6]:
Copied!
# 获取Spark会话
spark = get_spark()
spark
# 获取Spark会话
spark = get_spark()
spark
Out[6]:
SparkSession - hive
加载数据集¶
In [7]:
Copied!
# 加载数据集
dataset = spark.sql(f"""
select * from {args.dataset}
where dt='{args.dataset_dt}' and pt='{args.dataset_pt}'
and substr(uid, 3, 1) in ({args.dataset_substr})
and substr(uid, 4, 1) = '0'
""")
logger.info("数据集的schema:")
dataset.printSchema()
# 加载数据集
dataset = spark.sql(f"""
select * from {args.dataset}
where dt='{args.dataset_dt}' and pt='{args.dataset_pt}'
and substr(uid, 3, 1) in ({args.dataset_substr})
and substr(uid, 4, 1) = '0'
""")
logger.info("数据集的schema:")
dataset.printSchema()
2025-03-29 17:50:56.575 | INFO | __main__:<module>:9 - 数据集的schema:
root |-- uid: string (nullable = true) |-- user_code: integer (nullable = true) |-- tag_id: string (nullable = true) |-- tag_code: integer (nullable = true) |-- tag_name: string (nullable = true) |-- weight: double (nullable = true) |-- card_u: long (nullable = true) |-- card_t: long (nullable = true) |-- dt: string (nullable = true) |-- pt: string (nullable = true)
In [8]:
Copied!
# 分割数据集
train_data, test_data = split_dataset(
dataset,
train_test_split=args.train_test_split,
split_by_user=args.split_by_user,
add_negative_samples=args.add_negative_samples,
seed=args.seed,
neg_sample_ratio=args.neg_sample_ratio
)
# 分割数据集
train_data, test_data = split_dataset(
dataset,
train_test_split=args.train_test_split,
split_by_user=args.split_by_user,
add_negative_samples=args.add_negative_samples,
seed=args.seed,
neg_sample_ratio=args.neg_sample_ratio
)
2025-03-29 17:51:34.311 | INFO | __main__:split_dataset:53 - 数据中总标签数:50765
设置训练参数¶
In [9]:
Copied!
# 设置ALS模型参数
params = {
"maxIter": args.maxIter,
"regParam": args.regParam,
"userCol": 'user_code',
"itemCol": 'tag_code',
"ratingCol": 'weight',
"rank": args.rank,
"coldStartStrategy": 'drop',
"implicitPrefs": args.implicitPrefs,
"alpha": args.alpha,
"numUserBlocks": args.als_blocks,
"numItemBlocks": args.als_blocks,
"seed": 14683
}
als = ALS(**params)
logger.info("模型参数:{}", params)
# 设置ALS模型参数
params = {
"maxIter": args.maxIter,
"regParam": args.regParam,
"userCol": 'user_code',
"itemCol": 'tag_code',
"ratingCol": 'weight',
"rank": args.rank,
"coldStartStrategy": 'drop',
"implicitPrefs": args.implicitPrefs,
"alpha": args.alpha,
"numUserBlocks": args.als_blocks,
"numItemBlocks": args.als_blocks,
"seed": 14683
}
als = ALS(**params)
logger.info("模型参数:{}", params)
2025-03-29 17:52:18.894 | INFO | __main__:<module>:18 - 模型参数:{'maxIter': 10, 'regParam': 0.01, 'userCol': 'user_code', 'itemCol': 'tag_code', 'ratingCol': 'weight', 'rank': 24, 'coldStartStrategy': 'drop', 'implicitPrefs': True, 'alpha': 500, 'numUserBlocks': 10, 'numItemBlocks': 10, 'seed': 14683}
In [10]:
Copied!
# 分数数据转换
transform = {
"power5": expr("pow(weight, 5)"),
"power4": expr("pow(weight, 4)"),
"power3": expr("pow(weight, 3)"),
"power2": expr("pow(weight, 2)"),
"logit": expr("ln(weight/(1.001 - weight)) + 0.5"),
"none": col("weight")
}
transform_name = args.transform
logger.info(f"使用转换:{transform_name} -> {transform[transform_name]}")
train_data = train_data.withColumn("weight", transform[transform_name])
# 分数数据转换
transform = {
"power5": expr("pow(weight, 5)"),
"power4": expr("pow(weight, 4)"),
"power3": expr("pow(weight, 3)"),
"power2": expr("pow(weight, 2)"),
"logit": expr("ln(weight/(1.001 - weight)) + 0.5"),
"none": col("weight")
}
transform_name = args.transform
logger.info(f"使用转换:{transform_name} -> {transform[transform_name]}")
train_data = train_data.withColumn("weight", transform[transform_name])
2025-03-29 17:52:18.966 | INFO | __main__:<module>:12 - 使用转换:power4 -> Column<'pow(weight, 4)'>
加载ALS模型向量¶
加载上一次训练的用户和物品向量
In [11]:
Copied!
model_path = "viewfs:///user_ext/weibo_bigdata_vf/yandi/als/checkpoints/dt=20241015/pt=long_obj/train.d_24.imp_False.reg_0.01.a_200.it_10.tf_none.sub_5"
model_path = "viewfs:///user_ext/weibo_bigdata_vf/yandi/als/checkpoints/dt=20250306/pt=long_obj/train.d_24.imp_True.reg_0.01.a_500.it_10.tf_power4.sub_5"
model = ALSModel.load(model_path)
model_path = "viewfs:///user_ext/weibo_bigdata_vf/yandi/als/checkpoints/dt=20241015/pt=long_obj/train.d_24.imp_False.reg_0.01.a_200.it_10.tf_none.sub_5"
model_path = "viewfs:///user_ext/weibo_bigdata_vf/yandi/als/checkpoints/dt=20250306/pt=long_obj/train.d_24.imp_True.reg_0.01.a_500.it_10.tf_power4.sub_5"
model = ALSModel.load(model_path)
增量训练¶
提供 Python 封装类 (IncrementalALS
),继承自 pyspark.ml.recommendation.ALS
,提供与原生 ALS 相似的 API 体验。
In [12]:
Copied!
from incremental_als_wrapper import IncrementalALS
from incremental_als_wrapper import IncrementalALS
In [13]:
Copied!
als_inc = IncrementalALS(**params)
als_inc = IncrementalALS(**params)
In [14]:
Copied!
als_inc.setInitialUserFactors(model.userFactors)
als_inc.setInitialItemFactors(model.itemFactors)
als_inc.setMaxIter(2)
als_inc.setInitialUserFactors(model.userFactors)
als_inc.setInitialItemFactors(model.itemFactors)
als_inc.setMaxIter(2)
/data0/spark/spark-3.2.0-bin/python/pyspark/sql/context.py:127: FutureWarning: Deprecated in 3.0.0. Use SparkSession.builder.getOrCreate() instead. FutureWarning
Out[14]:
IncrementalALS_8ff7af6856a2
In [15]:
Copied!
model_update = als_inc.fit(train_data)
model_update = als_inc.fit(train_data)
测试一致性¶
我们假设迭代两次后,物品/用户向量应该与训练前是基本一致的,为此,我们拿到训练前后相同id对应的Factors,计算两个Factors的cosine_similarity
In [16]:
Copied!
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType
import pandas as pd
import numpy as np
# --- Define the Pandas UDF ---
@pandas_udf(DoubleType())
def cosine_similarity_pandas_impl(vec1_series: pd.Series, vec2_series: pd.Series) -> pd.Series:
"""
Calculates cosine similarity between two pandas Series of vectors using NumPy.
"""
def calculate_similarity(v1, v2):
if v1 is None or v2 is None:
return 0.0
# Handle PySpark Vector types if necessary
if hasattr(v1, "toArray"):
v1 = v1.toArray()
if hasattr(v2, "toArray"):
v2 = v2.toArray()
vec1 = np.array(v1)
vec2 = np.array(v2)
dot_product = np.dot(vec1, vec2)
norm1 = np.linalg.norm(vec1)
norm2 = np.linalg.norm(vec2)
if norm1 == 0.0 or norm2 == 0.0:
return 0.0
similarity = dot_product / (norm1 * norm2)
return float(np.clip(similarity, -1.0, 1.0))
return pd.Series([calculate_similarity(v1, v2) for v1, v2 in zip(vec1_series, vec2_series)])
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType
import pandas as pd
import numpy as np
# --- Define the Pandas UDF ---
@pandas_udf(DoubleType())
def cosine_similarity_pandas_impl(vec1_series: pd.Series, vec2_series: pd.Series) -> pd.Series:
"""
Calculates cosine similarity between two pandas Series of vectors using NumPy.
"""
def calculate_similarity(v1, v2):
if v1 is None or v2 is None:
return 0.0
# Handle PySpark Vector types if necessary
if hasattr(v1, "toArray"):
v1 = v1.toArray()
if hasattr(v2, "toArray"):
v2 = v2.toArray()
vec1 = np.array(v1)
vec2 = np.array(v2)
dot_product = np.dot(vec1, vec2)
norm1 = np.linalg.norm(vec1)
norm2 = np.linalg.norm(vec2)
if norm1 == 0.0 or norm2 == 0.0:
return 0.0
similarity = dot_product / (norm1 * norm2)
return float(np.clip(similarity, -1.0, 1.0))
return pd.Series([calculate_similarity(v1, v2) for v1, v2 in zip(vec1_series, vec2_series)])
In [17]:
Copied!
# Register the Pandas UDF with a name for SQL
spark.udf.register("cosine_similarity", cosine_similarity_pandas_impl)
# Register the Pandas UDF with a name for SQL
spark.udf.register("cosine_similarity", cosine_similarity_pandas_impl)
Out[17]:
<function __main__.cosine_similarity_pandas_impl(vec1_series: pandas.core.series.Series, vec2_series: pandas.core.series.Series) -> pandas.core.series.Series>
In [18]:
Copied!
model.itemFactors.createOrReplaceTempView("factors_old")
model_update.itemFactors.createOrReplaceTempView("factors_new")
model.itemFactors.createOrReplaceTempView("factors_old")
model_update.itemFactors.createOrReplaceTempView("factors_new")
In [19]:
Copied!
spark.sql(f"""
create or replace temp view factor_similarity as
select
t1.id,
cosine_similarity(t1.features, t2.features) AS cosine_similarity
from factors_old t1
join factors_new t2
on t1.id=t2.id
""")
spark.sql(f"""
create or replace temp view factor_similarity as
select
t1.id,
cosine_similarity(t1.features, t2.features) AS cosine_similarity
from factors_old t1
join factors_new t2
on t1.id=t2.id
""")
Out[19]:
DataFrame[]
In [20]:
Copied!
spark.sql("select * from factor_similarity limit 10").show()
spark.sql("select * from factor_similarity limit 10").show()
+---+------------------+ | id| cosine_similarity| +---+------------------+ | 10| 0.999236524105072| | 20|0.9979416728019714| | 30|0.9974139928817749| | 40|0.9991188645362854| | 60|0.9989914894104004| | 70|0.9979536533355713| | 80|0.9982348680496216| |100|0.9929916262626648| |110| 0.997641921043396| |120|0.9980128407478333| +---+------------------+
In [21]:
Copied!
spark.sql("select count(1) from factor_similarity").show()
spark.sql("select count(1) from factor_similarity").show()
+--------+ |count(1)| +--------+ | 50548| +--------+
In [22]:
Copied!
spark.sql("""
select percentile(cosine_similarity, array(0.05, 0.25, 0.5, 0.75, 0.95)) as percentiles
from factor_similarity
""").collect()
spark.sql("""
select percentile(cosine_similarity, array(0.05, 0.25, 0.5, 0.75, 0.95)) as percentiles
from factor_similarity
""").collect()
Out[22]:
[Row(percentiles=[0.9887478619813919, 0.9944532811641693, 0.9963980615139008, 0.9976805597543716, 0.9987210184335709])]
In [23]:
Copied!
model.userFactors.createOrReplaceTempView("factors_old")
model_update.userFactors.createOrReplaceTempView("factors_new")
model.userFactors.createOrReplaceTempView("factors_old")
model_update.userFactors.createOrReplaceTempView("factors_new")
In [24]:
Copied!
spark.sql(f"""
create or replace temp view factor_similarity as
select
t1.id,
cosine_similarity(t1.features, t2.features) AS cosine_similarity
from factors_old t1
join factors_new t2
on t1.id=t2.id
""")
spark.sql(f"""
create or replace temp view factor_similarity as
select
t1.id,
cosine_similarity(t1.features, t2.features) AS cosine_similarity
from factors_old t1
join factors_new t2
on t1.id=t2.id
""")
Out[24]:
DataFrame[]
In [25]:
Copied!
spark.sql("select * from factor_similarity limit 10").show()
spark.sql("select * from factor_similarity limit 10").show()
+---------+------------------+ | id| cosine_similarity| +---------+------------------+ | 12163770|0.9984058737754822| |112196540|0.9994266033172607| | 14554750|0.9991267323493958| |270389530|0.9987061619758606| |259931230|0.9982590079307556| |159651070|0.9976629018783569| |263189300|0.9958972334861755| | 11457000|0.9993614554405212| |116713140|0.9987000226974487| | 22184200|0.9995443820953369| +---------+------------------+
In [26]:
Copied!
spark.sql("select count(1) from factor_similarity").show()
spark.sql("select count(1) from factor_similarity").show()
+--------+ |count(1)| +--------+ | 803097| +--------+
In [27]:
Copied!
spark.sql("""
select percentile(cosine_similarity, array(0.05, 0.25, 0.5, 0.75, 0.95)) as percentiles
from factor_similarity
""").collect()
spark.sql("""
select percentile(cosine_similarity, array(0.05, 0.25, 0.5, 0.75, 0.95)) as percentiles
from factor_similarity
""").collect()
Out[27]:
[Row(percentiles=[0.9961116194725036, 0.9981582760810852, 0.9988576173782349, 0.9992614388465881, 0.9995482563972473])]