MongoDB聚合管道详解
约 1559 字大约 5 分钟
mongodbaggregation
2025-05-17
MongoDB 聚合管道(Aggregation Pipeline)是一套强大的数据处理框架,通过多个阶段(Stage)的组合实现复杂的数据转换和分析。本文详细介绍各阶段的用法,并与 SQL 进行对比。
聚合管道概览
聚合管道的设计理念类似 Unix 管道:每个阶段接收文档流作为输入,处理后输出到下一个阶段。
db.orders.aggregate([
{ $match: { status: "completed" } },
{ $group: { _id: "$userId", totalAmount: { $sum: "$amount" } } },
{ $sort: { totalAmount: -1 } },
{ $limit: 10 }
])核心阶段详解
$match - 过滤
与 find() 的查询语法相同,应尽早使用以减少后续阶段的数据量。
// 基本过滤
{ $match: { status: "active", age: { $gte: 18 } } }
// 正则匹配
{ $match: { name: /^A/i } }
// 复杂条件
{ $match: {
$or: [
{ category: "electronics" },
{ price: { $gt: 1000 } }
]
}}优化提示:$match 放在管道最前面可以利用索引,放在 $group 后面则不能。
$group - 分组聚合
// 按字段分组并计算聚合值
db.orders.aggregate([
{
$group: {
_id: "$category", // 分组键
totalRevenue: { $sum: "$amount" },
avgAmount: { $avg: "$amount" },
maxAmount: { $max: "$amount" },
minAmount: { $min: "$amount" },
orderCount: { $sum: 1 }, // 计数
firstOrder: { $first: "$createdAt" },
lastOrder: { $last: "$createdAt" },
allStatuses: { $addToSet: "$status" }, // 去重集合
items: { $push: "$itemName" } // 数组
}
}
])
// 多字段分组
{
$group: {
_id: { year: { $year: "$createdAt" }, category: "$category" },
total: { $sum: "$amount" }
}
}
// 全局聚合(不分组)
{
$group: {
_id: null,
grandTotal: { $sum: "$amount" },
docCount: { $sum: 1 }
}
}$sort - 排序
// 升序 1, 降序 -1
{ $sort: { totalAmount: -1, name: 1 } }
// 在 $group 之前使用 $sort 会影响 $first/$last 的结果
db.orders.aggregate([
{ $sort: { createdAt: 1 } },
{ $group: {
_id: "$userId",
firstOrderDate: { $first: "$createdAt" },
lastOrderDate: { $last: "$createdAt" }
}}
])$project - 投影/重塑
db.users.aggregate([
{
$project: {
_id: 0, // 排除 _id
fullName: { $concat: ["$firstName", " ", "$lastName"] },
ageGroup: {
$switch: {
branches: [
{ case: { $lt: ["$age", 18] }, then: "minor" },
{ case: { $lt: ["$age", 60] }, then: "adult" },
],
default: "senior"
}
},
year: { $year: "$createdAt" },
month: { $month: "$createdAt" },
isActive: { $eq: ["$status", "active"] },
discountedPrice: {
$multiply: ["$price", { $subtract: [1, "$discountRate"] }]
}
}
}
])$lookup - 关联查询 (LEFT JOIN)
// 基本关联(等值连接)
db.orders.aggregate([
{
$lookup: {
from: "users", // 目标集合
localField: "userId", // 本地字段
foreignField: "_id", // 目标字段
as: "userInfo" // 输出数组字段名
}
},
{ $unwind: "$userInfo" } // 展开数组为对象
])
// 带管道的高级关联
db.orders.aggregate([
{
$lookup: {
from: "products",
let: { orderItems: "$items" },
pipeline: [
{ $match: {
$expr: { $in: ["$_id", "$$orderItems"] }
}},
{ $project: { name: 1, price: 1 } }
],
as: "productDetails"
}
}
])$unwind - 展开数组
// 原始文档: { _id: 1, tags: ["redis", "database", "cache"] }
db.articles.aggregate([
{ $unwind: "$tags" }
])
// 展开后生成 3 个文档:
// { _id: 1, tags: "redis" }
// { _id: 1, tags: "database" }
// { _id: 1, tags: "cache" }
// 保留空数组/缺失字段的文档
{
$unwind: {
path: "$tags",
preserveNullAndEmptyArrays: true,
includeArrayIndex: "tagIndex"
}
}$facet - 多面聚合
同时执行多个聚合管道,返回多组结果。
db.products.aggregate([
{
$facet: {
// 按类别统计
"byCategory": [
{ $group: { _id: "$category", count: { $sum: 1 } } },
{ $sort: { count: -1 } }
],
// 价格统计
"priceStats": [
{ $group: {
_id: null,
avgPrice: { $avg: "$price" },
minPrice: { $min: "$price" },
maxPrice: { $max: "$price" }
}}
],
// 价格分布
"priceRanges": [
{ $bucket: {
groupBy: "$price",
boundaries: [0, 100, 500, 1000, 5000],
default: "5000+",
output: { count: { $sum: 1 } }
}}
]
}
}
])$graphLookup - 图查询/递归查询
用于查询层级结构或图关系数据(类似 SQL 的递归 CTE)。
// 组织架构树查询
db.employees.aggregate([
{ $match: { name: "CEO" } },
{
$graphLookup: {
from: "employees",
startWith: "$_id",
connectFromField: "_id",
connectToField: "managerId",
as: "allSubordinates",
maxDepth: 5, // 最大递归深度
depthField: "level", // 记录递归层级
restrictSearchWithMatch: { // 限制搜索范围
status: "active"
}
}
}
])累加器操作符
| 操作符 | 说明 | $group 中 | $project 中 |
|---|---|---|---|
$sum | 求和/计数 | 是 | 否 |
$avg | 平均值 | 是 | 否 |
$min / $max | 最小/最大值 | 是 | 否 |
$first / $last | 首个/末个值 | 是 | 否 |
$push | 追加到数组 | 是 | 否 |
$addToSet | 追加到集合(去重) | 是 | 否 |
$stdDevPop | 总体标准差 | 是 | 否 |
$stdDevSamp | 样本标准差 | 是 | 否 |
与 SQL 对比
SQL vs Pipeline 示例
-- SQL
SELECT category, COUNT(*) AS count, AVG(price) AS avg_price
FROM products
WHERE status = 'active'
GROUP BY category
HAVING COUNT(*) > 10
ORDER BY avg_price DESC
LIMIT 5;// MongoDB Pipeline
db.products.aggregate([
{ $match: { status: "active" } },
{ $group: {
_id: "$category",
count: { $sum: 1 },
avgPrice: { $avg: "$price" }
}},
{ $match: { count: { $gt: 10 } } },
{ $sort: { avgPrice: -1 } },
{ $limit: 5 },
{ $project: {
_id: 0,
category: "$_id",
count: 1,
avgPrice: { $round: ["$avgPrice", 2] }
}}
])优化策略
管道优化规则
// 大数据集聚合,允许使用磁盘
db.bigCollection.aggregate([
{ $match: { date: { $gte: ISODate("2025-01-01") } } },
{ $group: { _id: "$userId", total: { $sum: "$amount" } } },
{ $sort: { total: -1 } }
], { allowDiskUse: true })
// 使用 $merge 将聚合结果写入另一个集合
db.orders.aggregate([
{ $group: { _id: "$userId", total: { $sum: "$amount" } } },
{ $merge: {
into: "user_totals",
whenMatched: "replace",
whenNotMatched: "insert"
}}
])explain 分析
db.orders.aggregate([
{ $match: { status: "completed" } },
{ $group: { _id: "$userId", total: { $sum: "$amount" } } }
]).explain("executionStats")
// 关注
// - 是否使用了索引 (IXSCAN vs COLLSCAN)
// - 是否利用了管道合并优化
// - 内存使用是否超过 100MB 限制MongoDB 自动优化
| 优化 | 说明 |
|---|---|
$match + $match | 合并为一个 $match |
$sort + $sort | 保留最后一个 $sort |
$project + $match | $match 提前到 $project 之前 |
$sort + $limit | 合并为 Top-N 排序(更高效) |
$lookup + $unwind | 合并为更高效的关联操作 |
实战案例
销售漏斗分析
db.events.aggregate([
{ $match: {
eventTime: {
$gte: ISODate("2025-05-01"),
$lt: ISODate("2025-06-01")
}
}},
{ $group: {
_id: "$userId",
events: { $addToSet: "$eventType" }
}},
{ $project: {
viewed: { $in: ["page_view", "$events"] },
addedToCart: { $in: ["add_to_cart", "$events"] },
purchased: { $in: ["purchase", "$events"] }
}},
{ $group: {
_id: null,
viewCount: { $sum: { $cond: ["$viewed", 1, 0] } },
cartCount: { $sum: { $cond: ["$addedToCart", 1, 0] } },
purchaseCount: { $sum: { $cond: ["$purchased", 1, 0] } }
}}
])总结
- 聚合管道是 MongoDB 中数据分析的核心工具,通过阶段组合实现复杂计算
$match应尽量放在管道最前端,利用索引减少数据扫描$lookup实现了类似 SQL JOIN 的功能,$graphLookup支持递归查询$facet可以在一次查询中执行多个并行聚合- 大数据集使用
allowDiskUse: true防止内存溢出 - MongoDB 会自动进行部分管道优化,如合并相邻的
$match阶段
贡献者
更新日志
2026/3/14 13:09
查看所有更新日志
9f6c2-feat: organize wiki content and refresh site setup于