事务处理模块原理说明
基于1.31 本文档深入解析事务处理模块的设计原理和核心实现,涵盖事务管理、嵌套事务、保存点机制等核心内容。通过阅读本文档,您将能够完全理解 GORM 事务处理模块的工作原理,无需额外阅读源码。
目录
一、设计原理
1.1 模块定位
在整体架构中的位置
事务处理模块是 GORM 的数据一致性保障层,负责管理数据库事务的创建、提交和回滚。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| ┌─────────────────────────────────────────────────────────────┐ │ 用户操作 │ │ db.Transaction(func(tx *gorm.DB) error { │ │ // 多个数据库操作 │ │ tx.Create(&user) │ │ tx.Create(&order) │ │ return nil │ │ }) │ └────────────────────────┬────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ 事务管理 (本模块) ★ │ │ ┌────────────────────────────────────────────────────┐ │ │ │ Transaction() - 事务块执行 │ │ │ │ ├─ 检查连接池类型 │ │ │ │ ├─ 创建事务 / 保存点 (嵌套) │ │ │ │ ├─ 执行用户函数 │ │ │ │ └─ 提交 / 回滚 (根据结果) │ │ │ └────────────────────────────────────────────────────┘ │ │ │ │ 基础操作: │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ Begin │ │ Commit │ │ Rollback │ │SavePoint │ │ │ │ (开始) │ │ (提交) │ │ (回滚) │ │ (保存点) │ │ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ └────────────────────────┬────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ 回调系统集成 │ │ ┌────────────────────────────────────────────────────┐ │ │ │ BeginTransaction 回调 (create/update/delete) │ │ │ │ CommitOrRollbackTransaction 回调 │ │ │ └────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────┘
|
核心价值
事务处理模块的核心价值在于:
- 数据一致性: 保证一组操作要么全部成功,要么全部失败
- 原子性: 支持复杂业务逻辑的事务性操作
- 嵌套支持: 通过保存点实现嵌套事务
- 错误处理: 自动处理 panic 和错误,确保资源释放
1.2 设计目的
问题 1: 如何保证多个数据库操作的原子性?
- 挑战: 多个操作需要作为一个整体执行
- 挑战: 中途出错需要撤销所有已执行的操作
- 挑战: 需要正确处理 panic 和错误
解决方案: 事务块 + 自动回滚机制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
db.Exec("UPDATE accounts SET balance = balance - 100 WHERE id = ?", accountA) db.Exec("UPDATE accounts SET balance = balance + 100 WHERE id = ?", accountB)
db.Transaction(func(tx *gorm.DB) error { if err := tx.Exec("UPDATE accounts SET balance = balance - 100 WHERE id = ?", accountA).Error; err != nil { return err } if err := tx.Exec("UPDATE accounts SET balance = balance + 100 WHERE id = ?", accountB).Error; err != nil { return err } return nil })
|
问题 2: 如何支持嵌套事务?
- 挑战: 数据库不一定支持真正的嵌套事务
- 挑战: 需要在外层事务中建立”逻辑上的”嵌套
- 挑战: 内层回滚不应影响外层事务
解决方案: 保存点 (SavePoint) 机制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| db.Transaction(func(tx1 *gorm.DB) error { tx1.Create(&user)
tx1.Transaction(func(tx2 *gorm.DB) error { tx2.Create(&order)
if someError { return errors.New("inner error") } return nil })
tx1.Create(&log) return nil })
|
问题 3: 如何默认事务和显式事务的平衡?
- 挑战: 每个操作都用事务会影响性能
- 挑战: 不用事务无法保证一致性
- 挑战: 需要灵活的配置选项
解决方案: 可配置的默认事务 + 显式事务 API
1 2 3 4 5 6 7 8 9 10 11
| db.Create(&user)
db.Session(&gorm.Session{SkipDefaultTransaction: true}).Create(&user)
db.Transaction(func(tx *gorm.DB) error { return nil })
|
1.3 结构安排依据
3 天学习时间的科学分配
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| Day 1: 事务基础 (核心概念) 目标: 理解事务的基本概念和使用 重点: - ACID 特性 - Transaction API - Begin/Commit/Rollback
Day 2: 嵌套事务 (高级特性) 目标: 理解嵌套事务的实现 重点: - 保存点机制 - 嵌套事务规则 - 错误传播
Day 3: 实际应用 (最佳实践) 目标: 掌握事务的实际应用 重点: - 业务场景分析 - 性能优化 - 错误处理
|
由浅入深的学习路径
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| 第 1 层: 使用层 (如何使用) ├─ Transaction 事务块 ├─ Begin/Commit/Rollback 手动控制 └─ SavePoint 保存点
第 2 层: 机制层 (如何工作) ├─ 事务如何开始 ├─ 嵌套如何实现 └─ 回滚如何触发
第 3 层: 原理层 (为什么这样设计) ├─ ACID 理论 ├─ 保存点原理 └─ 错误处理策略
|
1.4 与其他模块的逻辑关系
依赖关系
- 依赖连接池: 需要支持事务的连接池
- 依赖回调系统: 默认事务通过回调实现
- 依赖查询构建: 事务内使用链式 API
支撑关系
- 支撑所有写操作: Create/Update/Delete 的默认事务包装
- 支撑数据一致性: 保证操作的原子性
模块交互图
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| 事务模块 ↔ 其他模块:
┌─────────────┐ │ 连接池 │ → 提供事务连接 │ ConnPool │ - TxBeginner 接口 └──────┬──────┘ - TxCommitter 接口 │ ▼ ┌─────────────────────────────┐ │ 事务模块 │ │ ┌───────────────────────┐ │ │ │ Transaction() │ │ │ │ Begin/Commit/Rollback │ │ │ │ SavePoint │ │ │ └───────────────────────┘ │ └──────────┬──────────────────┘ │ ├─→ 回调系统 (默认事务) ├─→ 查询构建 (事务内操作) └─→ 错误处理 (回滚触发)
|
二、核心原理
2.1 关键概念
概念 1: Transaction 结构
定义: Transaction 是事务的完整执行流程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| func (db *DB) Transaction(fc func(tx *DB) error, opts ...*sql.TxOptions) (err error) { panicked := true
if committer, ok := db.Statement.ConnPool.(TxCommitter); ok { if !db.DisableNestedTransaction { spID := new(maphash.Hash).Sum64() err = db.SavePoint(fmt.Sprintf("sp%d", spID)).Error defer func() { if panicked || err != nil { db.RollbackTo(fmt.Sprintf("sp%d", spID)) } }() } err = fc(db.Session(&Session{NewDB: db.clone == 1})) } else { tx := db.Begin(opts...) defer func() { if panicked || err != nil { tx.Rollback() } }() if err = fc(tx); err == nil { panicked = false return tx.Commit().Error } }
panicked = false return }
|
执行流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| Transaction(fc, opts): │ ├─► 1. 检查连接池类型 │ └─ db.Statement.ConnPool.(TxCommitter) │ ├─ 是: 已在事务中 → 嵌套事务 │ └─ 否: 不在事务中 → 新建事务 │ ├─► 2a. 嵌套事务路径 │ ├─ 创建保存点 (SAVEPOINT spXXX) │ ├─ 设置 defer 回滚到保存点 │ ├─ 执行用户函数 fc(db) │ └─ 根据结果释放或回滚保存点 │ ├─► 2b. 新建事务路径 │ ├─ 开始事务 (BEGIN) │ ├─ 设置 defer 回滚事务 │ ├─ 执行用户函数 fc(tx) │ └─ 根据结果提交或回滚 │ └─► 3. 返回错误
|
概念 2: Begin/Commit/Rollback
定义: 手动事务控制的三个核心方法。
Begin 实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| func (db *DB) Begin(opts ...*sql.TxOptions) *DB { var ( tx = db.getInstance().Session(&Session{ Context: db.Statement.Context, NewDB: db.clone == 1, }) opt *sql.TxOptions err error )
if len(opts) > 0 { opt = opts[0] }
ctx := tx.Statement.Context if db.DefaultTransactionTimeout > 0 { if _, ok := ctx.Deadline(); !ok { ctx, _ = context.WithTimeout(ctx, db.DefaultTransactionTimeout) } }
switch beginner := tx.Statement.ConnPool.(type) { case TxBeginner: tx.Statement.ConnPool, err = beginner.BeginTx(ctx, opt) case ConnPoolBeginner: tx.Statement.ConnPool, err = beginner.BeginTx(ctx, opt) default: err = ErrInvalidTransaction }
if err != nil { tx.AddError(err) }
return tx }
|
Commit 实现:
1 2 3 4 5 6 7 8
| func (db *DB) Commit() *DB { if committer, ok := db.Statement.ConnPool.(TxCommitter); ok && committer != nil { db.AddError(committer.Commit()) } else { db.AddError(ErrInvalidTransaction) } return db }
|
Rollback 实现:
1 2 3 4 5 6 7 8
| func (db *DB) Rollback() *DB { if committer, ok := db.Statement.ConnPool.(TxCommitter); ok && committer != nil { db.AddError(committer.Rollback()) } else { db.AddError(ErrInvalidTransaction) } return db }
|
使用示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| err := db.Transaction(func(tx *gorm.DB) error { if err := tx.Create(&user).Error; err != nil { return err } if err := tx.Create(&order).Error; err != nil { return err } return nil })
tx := db.Begin() defer func() { if r := recover(); r != nil { tx.Rollback() panic(r) } }()
if err := tx.Create(&user).Error; err != nil { tx.Rollback() return err }
if err := tx.Create(&order).Error; err != nil { tx.Rollback() return err }
tx.Commit()
|
概念 3: 保存点 (SavePoint)
定义: 保存点是事务中的标记点,可以回滚到该点而不影响整个事务。
实现原理:
1 2 3 4 5 6 7 8 9 10 11
| func (db *DB) SavePoint(name string) *DB { db.Exec("SAVEPOINT ?", name) return db }
func (db *DB) RollbackTo(name string) *DB { db.Exec("ROLLBACK TO SAVEPOINT ?", name) return db }
|
嵌套事务实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| 外层事务: BEGIN ┌─────────────────────────────────┐ │ 内层事务 1 │ │ SAVEPOINT sp1 │ │ 操作 A │ │ 操作 B │ │ 如果失败: ROLLBACK TO sp1 │ │ 如果成功: RELEASE sp1 │ └─────────────────────────────────┘ ┌─────────────────────────────────┐ │ 内层事务 2 │ │ SAVEPOINT sp2 │ │ 操作 C │ │ 操作 D │ │ 如果失败: ROLLBACK TO sp2 │ │ 如果成功: RELEASE sp2 │ └─────────────────────────────────┘ COMMIT
关键特性: - 内层失败不会导致外层回滚 - 每个保存点独立 - 可以部分回滚
|
代码示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| db.Transaction(func(tx1 *gorm.DB) error { tx1.Create(&User{Name: "Alice"})
err := tx1.Transaction(func(tx2 *gorm.DB) error { tx2.Create(&Order{UserID: 1, Amount: 100}) if someCondition { return errors.New("rollback inner 1") } return nil })
err = tx1.Transaction(func(tx2 *gorm.DB) error { tx2.Create(&Payment{UserID: 1, Amount: 100}) return nil })
return nil })
|
概念 4: 默认事务机制
定义: Create/Update/Delete 操作默认使用事务包装。
回调实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
func BeginTransaction(db *gorm.DB) { if !db.Config.SkipDefaultTransaction && db.Error == nil { if tx := db.Begin(); tx.Error == nil { db.Statement.ConnPool = tx.Statement.ConnPool db.InstanceSet("gorm:started_transaction", true) } else if tx.Error == gorm.ErrInvalidTransaction { tx.Error = nil } else { db.Error = tx.Error } } }
func CommitOrRollbackTransaction(db *gorm.DB) { if !db.Config.SkipDefaultTransaction { if _, ok := db.InstanceGet("gorm:started_transaction"); ok { if db.Error != nil { db.Rollback() } else { db.Commit() } db.Statement.ConnPool = db.ConnPool } } }
|
回调注册:
1 2 3 4 5 6 7 8 9 10 11
| db.Callback().Create().Before("gorm:create").Register("gorm:begin_transaction", BeginTransaction) db.Callback().Create().After("gorm:create").Register("gorm:commit_or_rollback_transaction", CommitOrRollbackTransaction)
db.Callback().Update().Before("gorm:update").Register("gorm:begin_transaction", BeginTransaction) db.Callback().Update().After("gorm:update").Register("gorm:commit_or_rollback_transaction", CommitOrRollbackTransaction)
db.Callback().Delete().Before("gorm:delete").Register("gorm:begin_transaction", BeginTransaction) db.Callback().Delete().After("gorm:delete").Register("gorm:commit_or_rollback_transaction", CommitOrRollbackTransaction)
|
执行流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| Create(&user) 执行流程: │ ├─► 1. before:create 回调链 │ └─► BeginTransaction │ ├─ 检查 SkipDefaultTransaction │ ├─ 如果 false: BEGIN │ └─ 设置 started_transaction 标记 │ ├─► 2. create 回调 │ └─► 执行 INSERT 操作 │ ├─ 成功: 继续 │ └─ 失败: 设置 db.Error │ ├─► 3. after:create 回调链 │ └─► CommitOrRollbackTransaction │ ├─ 检查 started_transaction 标记 │ ├─ 检查 db.Error │ ├─ 有错误: ROLLBACK │ └─ 无错误: COMMIT │ └─► 4. 返回结果
|
跳过默认事务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{ SkipDefaultTransaction: true, })
db.Session(&gorm.Session{ SkipDefaultTransaction: true, }).Create(&user)
|
概念 5: 错误处理与 Panic 恢复
定义: Transaction 函数自动处理错误和 panic。
错误处理机制:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| func (db *DB) Transaction(fc func(tx *DB) error, opts ...*sql.TxOptions) (err error) { panicked := true
defer func() { if panicked || err != nil { if committer != nil { committer.Rollback() } } }()
err = fc(db.Session(&Session{NewDB: db.clone == 1}))
panicked = false return }
|
Panic 恢复:
1 2 3 4 5 6 7 8 9 10 11 12
| db.Transaction(func(tx *gorm.DB) error { tx.Create(&user) panic("something went wrong!") })
|
错误传播:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| db.Transaction(func(tx1 *gorm.DB) error { tx1.Create(&user)
err := tx1.Transaction(func(tx2 *gorm.DB) error { return errors.New("inner error") })
if err != nil { return err }
return nil })
|
2.2 理论基础
理论 1: ACID 特性
ACID 是事务的四个基本特性:
| 特性 |
全称 |
说明 |
GORM 实现 |
| Atomicity |
原子性 |
事务中的操作要么全部成功,要么全部失败 |
通过 Transaction() + 自动回滚实现 |
| Consistency |
一致性 |
事务执行前后,数据库从一个一致状态变到另一个一致状态 |
通过事务保证 |
| Isolation |
隔离性 |
并发事务之间互不干扰 |
通过数据库的隔离级别实现 |
| Durability |
持久性 |
事务提交后,修改永久保存 |
通过数据库的持久化实现 |
原子性实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| db.Transaction(func(tx *gorm.DB) error { var accountA Account if err := tx.First(&accountA, 1).Error; err != nil { return err }
var accountB Account if err := tx.First(&accountB, 2).Error; err != nil { return err }
accountA.Balance -= 100 if err := tx.Save(&accountA).Error; err != nil { return err }
accountB.Balance += 100 if err := tx.Save(&accountB).Error; err != nil { return err }
log := TransactionLog{ FromAccountID: 1, ToAccountID: 2, Amount: 100, } if err := tx.Create(&log).Error; err != nil { return err }
return nil })
|
理论 2: 保存点原理
保存点的作用:
- 部分回滚: 允许回滚到事务中的某个点
- 嵌套事务: 模拟嵌套事务行为
- 错误恢复: 在不放弃整个事务的情况下恢复错误
SQL 实现:
1 2 3 4 5 6 7 8
| SAVEPOINT sp1;
ROLLBACK TO SAVEPOINT sp1;
RELEASE SAVEPOINT sp1;
|
GORM 封装:
1 2 3 4 5 6 7 8 9
| db.SavePoint("my_savepoint")
db.RollbackTo("my_savepoint")
spID := new(maphash.Hash).Sum64() savePointName := fmt.Sprintf("sp%d", spID)
|
嵌套层级:
1 2 3 4 5 6 7 8 9 10 11 12
| 事务开始 │ ├─ 保存点 1 (sp1) │ ├─ 操作 A │ ├─ 保存点 2 (sp2) │ │ ├─ 操作 B │ │ ├─ 操作 C │ │ └─ 回滚到 sp2 ← B 和 C 被撤销 │ ├─ 操作 D │ └─ 回滚到 sp1 ← A、B、C、D 都被撤销 │ └─ 提交
|
理论 3: 隔离级别
数据库隔离级别:
| 级别 |
脏读 |
不可重复读 |
幻读 |
说明 |
| Read Uncommitted |
可能 |
可能 |
可能 |
最低隔离,性能最好 |
| Read Committed |
不可能 |
可能 |
可能 |
大多数数据库的默认级别 |
| Repeatable Read |
不可能 |
不可能 |
可能 |
MySQL 的默认级别 |
| Serializable |
不可能 |
不可能 |
不可能 |
最高隔离,性能最差 |
设置隔离级别:
1 2 3 4 5 6 7 8 9 10 11 12 13
| db.Transaction(func(tx *gorm.DB) error { return nil }, &sql.TxOptions{ Isolation: sql.LevelReadCommitted, })
db.Exec("SET TRANSACTION ISOLATION LEVEL READ COMMITTED") db.Transaction(func(tx *gorm.DB) error { return nil })
|
隔离级别选择:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| Read Uncommitted: - 几乎不使用 - 用于性能要求极高,数据准确性要求低的场景
Read Committed (推荐): - 大多数场景的最佳选择 - 平衡了性能和一致性 - 避免脏读
Repeatable Read: - 需要事务内多次读取一致数据的场景 - 统计查询、报表生成 - MySQL 默认级别
Serializable: - 高并发写操作且不能有任何不一致 - 性能开销大 - 仅在必要时使用
|
2.3 学习方法
方法 1: 实验法
实验 1: 测试原子性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| func TestAtomicity(t *testing.T) { db.Transaction(func(tx *gorm.DB) error { user := User{Name: "Alice"} if err := tx.Create(&user).Error; err != nil { return err }
order := Order{UserID: user.ID, Amount: 100} if err := tx.Create(&order).Error; err != nil { return err }
return errors.New("intentional error") })
var count int64 db.Model(&User{}).Where("name = ?", "Alice").Count(&count) assert.Equal(t, int64(0), count) }
|
实验 2: 测试嵌套事务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| func TestNestedTransaction(t *testing.T) { var outerSuccess, innerSuccess bool
db.Transaction(func(tx1 *gorm.DB) error { tx1.Create(&User{Name: "Bob"}) outerSuccess = true
tx1.Transaction(func(tx2 *gorm.DB) error { tx2.Create(&Order{UserID: 1, Amount: 200}) innerSuccess = true
return errors.New("inner error") })
return nil })
var user User var order Order
err1 := db.First(&user, "name = ?", "Bob").Error err2 := db.First(&order).Error
assert.Nil(t, err1) assert.Error(t, err2) }
|
方法 2: 对比法
对比手动和自动事务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| tx := db.Begin() defer func() { if r := recover(); r != nil { tx.Rollback() panic(r) } }()
if err := tx.Create(&user).Error; err != nil { tx.Rollback() return err }
if err := tx.Create(&order).Error; err != nil { tx.Rollback() return err }
tx.Commit()
db.Transaction(func(tx *gorm.DB) error { if err := tx.Create(&user).Error; err != nil { return err } if err := tx.Create(&order).Error; err != nil { return err } return nil })
|
方法 3: 场景法
场景 1: 转账
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| func Transfer(fromID, toID uint, amount float64) error { return db.Transaction(func(tx *gorm.DB) error { var from Account if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}). First(&from, fromID).Error; err != nil { return err }
if from.Balance < amount { return errors.New("insufficient balance") }
var to Account if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}). First(&to, toID).Error; err != nil { return err }
from.Balance -= amount to.Balance += amount
if err := tx.Save(&from).Error; err != nil { return err }
if err := tx.Save(&to).Error; err != nil { return err }
return tx.Create(&TransactionLog{ FromAccountID: fromID, ToAccountID: toID, Amount: amount, }).Error }) }
|
场景 2: 订单创建
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| func CreateOrder(order Order, items []OrderItem) error { return db.Transaction(func(tx *gorm.DB) error { if err := tx.Create(&order).Error; err != nil { return err }
for _, item := range items { item.OrderID = order.ID
var product Product if err := tx.First(&product, item.ProductID).Error; err != nil { return err }
if product.Stock < item.Quantity { return fmt.Errorf("insufficient stock for product %d", item.ProductID) }
product.Stock -= item.Quantity if err := tx.Save(&product).Error; err != nil { return err }
if err := tx.Create(&item).Error; err != nil { return err } }
return nil }) }
|
2.4 实施策略
策略 1: 分层学习
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| 第 1 层: 理解基础概念 (Day 1) 目标: 理解事务的基本概念 内容: - ACID 特性 - Transaction API - Begin/Commit/Rollback 验证: - 能解释 ACID - 能使用 Transaction - 能手动控制事务
第 2 层: 掌握嵌套事务 (Day 2) 目标: 理解嵌套事务的实现 内容: - 保存点机制 - 嵌套事务规则 - 错误传播 验证: - 能使用嵌套事务 - 能理解保存点 - 能处理错误传播
第 3 层: 应用最佳实践 (Day 3) 目标: 掌握实际应用 内容: - 业务场景分析 - 性能优化 - 错误处理 验证: - 能设计业务事务 - 能优化性能 - 能处理错误
|
策略 2: 问题驱动
问题序列:
为什么需要事务?
- 尝试不用事务的转账
- 观察数据不一致问题
- 理解事务的价值
嵌套事务如何工作?
- 测试嵌套事务行为
- 观察保存点 SQL
- 理解回滚范围
如何优化事务性能?
如何处理错误?
- 测试不同错误场景
- 理解 panic 恢复
- 设计错误处理策略
策略 3: 验证反馈
验证点 1: 原子性验证
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| func TestTransactionAtomicity(t *testing.T) { db.Create(&User{Name: "Alice"}) db.Create(&Order{UserID: 999})
err := db.Transaction(func(tx *gorm.DB) error { tx.Create(&User{Name: "Bob"}) tx.Create(&Order{UserID: 888}) return nil })
}
|
验证点 2: 隔离性验证
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| func TestTransactionIsolation(t *testing.T) { done := make(chan bool)
go func() { db.Transaction(func(tx1 *gorm.DB) error { var user User tx1.First(&user, 1) time.Sleep(100 * time.Millisecond) user.Name = "Updated by Tx1" return tx1.Save(&user).Error }) done <- true }()
go func() { time.Sleep(50 * time.Millisecond) db.Transaction(func(tx2 *gorm.DB) error { var user User tx2.First(&user, 1) user.Name = "Updated by Tx2" return tx2.Save(&user).Error }) done <- true }()
<-done <-done
}
|
二、核心数据结构
2.1 事务相关接口
源码位置: interfaces.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| type ConnPool interface{}
type TxBeginner interface { BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) }
type ConnPoolBeginner interface { BeginTx(ctx context.Context, opts *sql.TxOptions) (ConnPool, error) }
type TxCommitter interface { Commit() error Rollback() error }
|
接口关系图:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| ┌──────────────────┐ │ ConnPool │ │ (基础连接池) │ └────────┬─────────┘ │ ┌───────────────────┼───────────────────┐ │ │ │ ▼ ▼ ▼ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ TxBeginner │ │ConnPoolBegin │ │TxCommitter │ │ BeginTx() │ │BeginTx() │ │Commit() │ │ → *sql.Tx │ │ → ConnPool │ │Rollback() │ └──────────────┘ └──────────────┘ └──────────────┘ │ │ │ └───────────────────┼───────────────────┘ │ ▼ ┌──────────────────┐ │ Tx │ │ (完整事务接口) │ │ ConnPool + │ │ TxCommitter + │ │ StmtContext │ └──────────────────┘
|
接口说明:
| 接口 |
方法 |
说明 |
返回值 |
| TxBeginner |
BeginTx |
开始事务 |
*sql.Tx |
| ConnPoolBeginner |
BeginTx |
开始事务(扩展版) |
ConnPool |
| TxCommitter |
Commit |
提交事务 |
error |
| TxCommitter |
Rollback |
回滚事务 |
error |
2.2 TxOptions 配置
源码位置: database/sql
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| type TxOptions struct { Isolation IsolationLevel ReadOnly bool }
type IsolationLevel int
const ( LevelDefault IsolationLevel = iota LevelReadUncommitted LevelReadCommitted LevelWriteCommitted LevelRepeatableRead LevelSnapshot LevelSerializable LevelLinearizable )
|
使用示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| db.Transaction(func(tx *gorm.DB) error { return nil }, &sql.TxOptions{ Isolation: sql.LevelReadCommitted, })
db.Transaction(func(tx *gorm.DB) error { var users []User return tx.Find(&users).Error }, &sql.TxOptions{ ReadOnly: true, })
|
三、事务核心实现
3.1 Transaction 函数完整实现
源码位置: finisher_api.go:632-678
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
|
func (db *DB) Transaction(fc func(tx *DB) error, opts ...*sql.TxOptions) (err error) { panicked := true
if committer, ok := db.Statement.ConnPool.(TxCommitter); ok && committer != nil { if !db.DisableNestedTransaction { spID := new(maphash.Hash).Sum64()
err = db.SavePoint(fmt.Sprintf("sp%d", spID)).Error if err != nil { return }
defer func() { if panicked || err != nil { db.RollbackTo(fmt.Sprintf("sp%d", spID)) } }() }
err = fc(db.Session(&Session{NewDB: db.clone == 1})) } else { tx := db.Begin(opts...) if tx.Error != nil { return tx.Error }
defer func() { if panicked || err != nil { tx.Rollback() } }()
if err = fc(tx); err == nil { panicked = false return tx.Commit().Error } }
panicked = false return }
|
执行流程图:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| Transaction(fc, opts): │ ├─► 1. 检查连接池类型 │ db.Statement.ConnPool.(TxCommitter) │ │ │ ├─ 是 TxCommitter? │ │ │ │ │ ├─ YES → 已在事务中 │ │ │ │ │ │ │ ├─ DisableNestedTransaction? │ │ │ │ │ │ │ │ │ ├─ NO → 创建保存点 │ │ │ │ │ ├─ SavePoint(fmt.Sprintf("sp%d", spID)) │ │ │ │ │ └─ defer RollbackTo(spX) if panic/err │ │ │ │ │ │ │ │ │ └─ 执行 fc(db) │ │ │ │ └─ 返回错误 │ │ │ │ │ │ │ └─ YES → 不创建保存点 │ │ │ └─ 直接执行 fc(db) │ │ │ │ │ └─ NO → 不在事务中 │ │ │ │ │ └─ 开始新事务 │ │ ├─ Begin(opts...) │ │ ├─ defer Rollback() if panic/err │ │ ├─ 执行 fc(tx) │ │ │ └─ if err == nil: Commit() │ │ └─ 返回错误 │ │ │ └─ panicked = false │ └─► 2. 返回错误
|
3.2 Begin 函数完整实现
源码位置: finisher_api.go:681-714
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
|
func (db *DB) Begin(opts ...*sql.TxOptions) *DB { var ( tx = db.getInstance().Session(&Session{ Context: db.Statement.Context, NewDB: db.clone == 1, }) opt *sql.TxOptions err error )
if len(opts) > 0 { opt = opts[0] }
ctx := tx.Statement.Context if db.DefaultTransactionTimeout > 0 { if _, ok := ctx.Deadline(); !ok { ctx, _ = context.WithTimeout(ctx, db.DefaultTransactionTimeout) } }
switch beginner := tx.Statement.ConnPool.(type) { case TxBeginner: tx.Statement.ConnPool, err = beginner.BeginTx(ctx, opt) case ConnPoolBeginner: tx.Statement.ConnPool, err = beginner.BeginTx(ctx, opt) default: err = ErrInvalidTransaction }
if err != nil { tx.AddError(err) }
return tx }
|
Begin 调用链:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| db.Begin(opts): │ ├─► 1. getInstance() │ └─ 创建新的DB实例,克隆配置 │ ├─► 2. Session(&Session{...}) │ ├─ Context: 复制上下文 │ └─ NewDB: 是否创建新DB │ ├─► 3. 设置超时 │ └─ WithTimeout(ctx, DefaultTransactionTimeout) │ ├─► 4. 调用 BeginTx │ ├─ TxBeginner.BeginTx() 或 │ ├─ ConnPoolBeginner.BeginTx() │ └─ 更新 ConnPool │ └─► 5. 返回 tx
|
3.3 Commit 和 Rollback 实现
源码位置: finisher_api.go:717-736
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| func (db *DB) Commit() *DB { if committer, ok := db.Statement.ConnPool.(TxCommitter); ok && committer != nil && !reflect.ValueOf(committer).IsNil() { db.AddError(committer.Commit()) } else { db.AddError(ErrInvalidTransaction) } return db }
func (db *DB) Rollback() *DB { if committer, ok := db.Statement.ConnPool.(TxCommitter); ok && committer != nil { if !reflect.ValueOf(committer).IsNil() { db.AddError(committer.Rollback()) } } else { db.AddError(ErrInvalidTransaction) } return db }
|
Commit/Rollback 流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| Commit(): │ ├─► 1. 检查 ConnPool 类型 │ └─ ConnPool.(TxCommitter) │ ├─► 2. 检查是否为nil │ └─ reflect.ValueOf(committer).IsNil() │ ├─► 3. 调用 Commit() │ └─ committer.Commit() │ └─► 4. 记录错误 └─ db.AddError(err)
Rollback(): │ ├─► 1. 检查 ConnPool 类型 │ └─ ConnPool.(TxCommitter) │ ├─► 2. 检查是否为nil │ └─ reflect.ValueOf(committer).IsNil() │ ├─► 3. 调用 Rollback() │ └─ committer.Rollback() │ └─► 4. 记录错误 └─ db.AddError(err)
|
3.4 SavePoint 和 RollbackTo 实现
源码位置: finisher_api.go:738-782
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
|
func (db *DB) SavePoint(name string) *DB { if savePointer, ok := db.Dialector.(SavePointerDialectorInterface); ok { var ( preparedStmtTx *PreparedStmtTX isPreparedStmtTx bool )
if preparedStmtTx, isPreparedStmtTx = db.Statement.ConnPool.(*PreparedStmtTX); isPreparedStmtTx { db.Statement.ConnPool = preparedStmtTx.Tx }
db.AddError(savePointer.SavePoint(db, name))
if isPreparedStmtTx { db.Statement.ConnPool = preparedStmtTx } } else { db.AddError(ErrUnsupportedDriver) } return db }
func (db *DB) RollbackTo(name string) *DB { if savePointer, ok := db.Dialector.(SavePointerDialectorInterface); ok { var ( preparedStmtTx *PreparedStmtTX isPreparedStmtTx bool )
if preparedStmtTx, isPreparedStmtTx = db.Statement.ConnPool.(*PreparedStmtTX); isPreparedStmtTx { db.Statement.ConnPool = preparedStmtTx.Tx }
db.AddError(savePointer.RollbackTo(db, name))
if isPreparedStmtTx { db.Statement.ConnPool = preparedStmtTx } } else { db.AddError(ErrUnsupportedDriver) } return db }
|
保存点操作流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| SavePoint(name): │ ├─► 1. 检查 Dialector 支持 │ └─ db.Dialector.(SavePointerDialectorInterface) │ ├─► 2. 处理预编译语句 │ └─ if ConnPool 是 PreparedStmtTX: │ ├─ 临时切换到 Tx │ └─ db.Statement.ConnPool = preparedStmtTx.Tx │ ├─► 3. 执行保存点SQL │ └─ savePointer.SavePoint(db, name) │ 实际执行: SAVEPOINT name │ ├─► 4. 恢复预编译语句 │ └─ db.Statement.ConnPool = preparedStmtTx │ └─► 5. 返回db
RollbackTo(name): │ ├─► 1. 检查 Dialector 支持 │ └─ db.Dialector.(SavePointerDialectorInterface) │ ├─► 2. 处理预编译语句 │ └─ if ConnPool 是 PreparedStmtTX: │ ├─ 临时切换到 Tx │ └─ db.Statement.ConnPool = preparedStmtTx.Tx │ ├─► 3. 执行回滚SQL │ └─ savePointer.RollbackTo(db, name) │ 实际执行: ROLLBACK TO SAVEPOINT name │ ├─► 4. 恢复预编译语句 │ └─ db.Statement.ConnPool = preparedStmtTx │ └─► 5. 返回db
|
SQL 执行示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| BEGIN;
SAVEPOINT sp1; INSERT INTO users (name) VALUES ('Alice');
SAVEPOINT sp2; INSERT INTO orders (user_id, amount) VALUES (1, 100);
ROLLBACK TO SAVEPOINT sp1;
INSERT INTO posts (title) VALUES ('Hello');
COMMIT;
|
3.5 事务执行流程图
完整事务生命周期:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| 用户调用 db.Transaction(fc): │ ├─► [阶段 1] 初始化检查 │ ├─ 检查 ConnPool 类型 │ └─ 决定执行路径 │ ├─► [阶段 2a] 嵌套事务路径 (已在事务中) │ │ │ ├─ 创建保存点 │ │ └─ SAVEPOINT spXXX │ │ │ ├─ 设置 defer 回滚 │ │ └─ defer { if panic || err: ROLLBACK TO spXXX } │ │ │ ├─ 执行用户函数 │ │ └─ fc(db) │ │ ├─ 成功 → 释放保存点 │ │ └─ 失败 → defer 回滚 │ │ │ └─ 返回错误 │ ├─► [阶段 2b] 新建事务路径 (不在事务中) │ │ │ ├─ 开始事务 │ │ └─ BEGIN │ │ │ ├─ 设置 defer 回滚 │ │ └─ defer { if panic || err: ROLLBACK } │ │ │ ├─ 执行用户函数 │ │ └─ fc(tx) │ │ ├─ 成功 → COMMIT │ │ └─ 失败 → defer 回滚 │ │ │ └─ 返回错误 │ └─► [阶段 3] 返回结果 └─ err (nil 或 error)
|
四、默认事务机制
4.1 BeginTransaction 实现
源码位置: callbacks/transaction.go:7-18
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
func BeginTransaction(db *gorm.DB) { if !db.Config.SkipDefaultTransaction && db.Error == nil { if tx := db.Begin(); tx.Error == nil { db.Statement.ConnPool = tx.Statement.ConnPool
db.InstanceSet("gorm:started_transaction", true) } else if tx.Error == gorm.ErrInvalidTransaction { tx.Error = nil } else { db.Error = tx.Error } } }
|
执行流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| BeginTransaction(db): │ ├─► 1. 检查 SkipDefaultTransaction │ ├─ true → 跳过,不开始事务 │ └─ false → 继续 │ ├─► 2. 检查 db.Error │ ├─ 非 nil → 跳过 │ └─ nil → 继续 │ ├─► 3. 调用 Begin() │ └─ tx := db.Begin() │ ├─► 4. 处理结果 │ ├─ tx.Error == nil │ │ ├─ 更新 ConnPool │ │ └─ 设置 started_transaction 标记 │ │ │ └─ tx.Error != nil │ ├─ ErrInvalidTransaction │ │ └─ 忽略错误(不支持事务) │ └─ 其他错误 │ └─ 设置到 db.Error │ └─► 5. 返回
|
4.2 CommitOrRollbackTransaction 实现
源码位置: callbacks/transaction.go:20-32
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
|
func CommitOrRollbackTransaction(db *gorm.DB) { if !db.Config.SkipDefaultTransaction { if _, ok := db.InstanceGet("gorm:started_transaction"); ok { if db.Error != nil { db.Rollback() } else { db.Commit() }
db.Statement.ConnPool = db.ConnPool } } }
|
执行流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| CommitOrRollbackTransaction(db): │ ├─► 1. 检查 SkipDefaultTransaction │ ├─ true → 跳过 │ └─ false → 继续 │ ├─► 2. 检查 started_transaction 标记 │ ├─ 不存在 → 跳过(事务不是由GORM开始的) │ └─ 存在 → 继续 │ ├─► 3. 检查 db.Error │ ├─ 非 nil → Rollback() │ └─ nil → Commit() │ ├─► 4. 恢复连接池 │ └─ db.Statement.ConnPool = db.ConnPool │ └─► 5. 返回
|
4.3 回调注册流程
源码位置: callbacks/callbacks.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| func RegisterDefaultCallbacks(db *gorm.DB, config *Config) { createCallback := db.Callback().Create()
createCallback.Match(enableTransaction).Register("gorm:begin_transaction", BeginTransaction)
createCallback.Register("gorm:create", Create(config))
createCallback.Register("gorm:commit_or_rollback_transaction", CommitOrRollbackTransaction)
updateCallback := db.Callback().Update()
updateCallback.Match(enableTransaction).Register("gorm:begin_transaction", BeginTransaction) updateCallback.Register("gorm:update", Update(config)) updateCallback.Register("gorm:commit_or_rollback_transaction", CommitOrRollbackTransaction)
deleteCallback := db.Callback().Delete()
deleteCallback.Match(enableTransaction).Register("gorm:begin_transaction", BeginTransaction) deleteCallback.Register("gorm:delete", Delete(config)) deleteCallback.Register("gorm:commit_or_rollback_transaction", CommitOrRollbackTransaction) }
func enableTransaction(db *gorm.DB) bool { return !db.SkipDefaultTransaction }
|
回调执行顺序:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| Create(&user) 执行流程: │ ├─► [before:create 阶段] │ │ │ ├─ gorm:begin_transaction │ │ ├─ 检查 SkipDefaultTransaction │ │ ├─ 调用 Begin() │ │ ├─ 更新 ConnPool │ │ └─ 设置 started_transaction 标记 │ │ │ └─ 其他 before 回调... │ ├─► [create 阶段] │ │ │ ├─ gorm:create │ │ ├─ 构建 SQL │ │ ├─ 执行 INSERT │ │ └─ 设置 db.Error │ │ │ └─ 其他 create 回调... │ ├─► [after:create 阶段] │ │ │ └─ gorm:commit_or_rollback_transaction │ ├─ 检查 started_transaction 标记 │ ├─ 检查 db.Error │ ├─ 有错误 → Rollback() │ ├─ 无错误 → Commit() │ └─ 恢复 ConnPool │ └─► 返回结果
|
4.4 默认事务流程图
Create 操作的完整事务流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| db.Create(&user) 调用: │ ├─► 1. 进入 Create 回调链 │ ├─► 2. before:create 阶段 │ │ │ ├─► gorm:begin_transaction │ │ │ │ │ ├─ SkipDefaultTransaction? │ │ │ ├─ YES → 跳过 │ │ │ └─ NO → 继续 │ │ │ │ │ ├─ db.Error == nil? │ │ │ ├─ NO → 跳过 │ │ │ └─ YES → 继续 │ │ │ │ │ ├─ db.Begin() │ │ │ ├─ 调用 BeginTx() │ │ │ └─ 获取 *sql.Tx │ │ │ │ │ ├─ 更新 ConnPool │ │ │ └─ db.Statement.ConnPool = tx.ConnPool │ │ │ │ │ └─ 设置标记 │ │ └─ db.InstanceSet("gorm:started_transaction", true) │ │ │ └─ 其他 before 回调... │ ├─► 3. create 阶段 │ │ │ ├─► gorm:create │ │ │ │ │ ├─ 构建 INSERT SQL │ │ │ │ │ ├─ 执行 SQL │ │ │ ├─ 成功 → 继续 │ │ │ └─ 失败 → 设置 db.Error │ │ │ │ │ └─ 返回 │ │ │ └─ 其他 create 回调... │ ├─► 4. after:create 阶段 │ │ │ └─► gorm:commit_or_rollback_transaction │ │ │ ├─ SkipDefaultTransaction? │ │ ├─ YES → 跳过 │ │ └─ NO → 继续 │ │ │ ├─ started_transaction 存在? │ │ ├─ NO → 跳过 │ │ └─ YES → 继续 │ │ │ ├─ 检查 db.Error │ │ │ │ │ ├─ 有错误 → Rollback() │ │ │ ├─ 调用 tx.Rollback() │ │ │ └─ 回滚事务 │ │ │ │ │ └─ 无错误 → Commit() │ │ ├─ 调用 tx.Commit() │ │ └─ 提交事务 │ │ │ └─ 恢复 ConnPool │ └─ db.Statement.ConnPool = db.ConnPool │ └─► 5. 返回结果
|
五、实战代码示例
5.1 基础事务使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| package main
import ( "fmt" "gorm.io/driver/mysql" "gorm.io/gorm" )
type User struct { ID uint Name string Email string }
type Account struct { ID uint UserID uint Balance float64 }
func main() { dsn := "user:password@tcp(127.0.0.1:3306)/dbname?charset=utf8mb4&parseTime=True&loc=Local" db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{}) if err != nil { panic(err) }
db.AutoMigrate(&User{}, &Account{})
err = db.Transaction(func(tx *gorm.DB) error { user := User{Name: "Alice", Email: "alice@example.com"} if err := tx.Create(&user).Error; err != nil { return err }
account := Account{ UserID: user.ID, Balance: 1000.0, } if err := tx.Create(&account).Error; err != nil { return err }
fmt.Printf("用户 %s 创建成功,账户余额: %.2f\n", user.Name, account.Balance) return nil })
if err != nil { fmt.Printf("事务失败: %v\n", err) } }
|
5.2 嵌套事务使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
| package main
import ( "errors" "fmt" "gorm.io/driver/mysql" "gorm.io/gorm" )
type User struct { ID uint Name string }
type Order struct { ID uint UserID uint Amount float64 }
type Log struct { ID uint UserID uint Action string }
func main() { dsn := "user:password@tcp(127.0.0.1:3306)/dbname?charset=utf8mb4&parseTime=True&loc=Local" db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{}) if err != nil { panic(err) }
db.AutoMigrate(&User{}, &Order{}, &Log{})
err = db.Transaction(func(tx1 *gorm.DB) error { fmt.Println("外层事务开始")
user := User{Name: "Bob"} if err := tx1.Create(&user).Error; err != nil { return err } fmt.Printf("创建用户: %s (ID: %d)\n", user.Name, user.ID)
err = tx1.Transaction(func(tx2 *gorm.DB) error { fmt.Println(" 内层事务 1 开始")
order := Order{ UserID: user.ID, Amount: 100.0, } if err := tx2.Create(&order).Error; err != nil { return err } fmt.Printf(" 创建订单: 金额 %.2f\n", order.Amount)
if order.Amount > 50 { fmt.Println(" 内层事务 1 回滚(金额过大)") return errors.New("amount too large") } return nil })
if err != nil { fmt.Printf(" 内层事务 1 失败: %v\n", err) }
err = tx1.Transaction(func(tx2 *gorm.DB) error { fmt.Println(" 内层事务 2 开始")
log := Log{ UserID: user.ID, Action: "created", } if err := tx2.Create(&log).Error; err != nil { return err } fmt.Printf(" 创建日志: %s\n", log.Action) return nil })
if err != nil { return err }
fmt.Println("外层事务提交") return nil })
if err != nil { fmt.Printf("事务失败: %v\n", err) } }
|
5.3 手动事务控制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| package main
import ( "fmt" "gorm.io/driver/mysql" "gorm.io/gorm" )
func main() { dsn := "user:password@tcp(127.0.0.1:3306)/dbname?charset=utf8mb4&parseTime=True&loc=Local" db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{}) if err != nil { panic(err) }
tx := db.Begin() defer func() { if r := recover(); r != nil { tx.Rollback() panic(r) } }()
if err := tx.Create(&User{Name: "Charlie"}).Error; err != nil { tx.Rollback() fmt.Printf("创建用户失败: %v\n", err) return }
if err := tx.Create(&Order{Amount: 200}).Error; err != nil { tx.Rollback() fmt.Printf("创建订单失败: %v\n", err) return }
if err := tx.Commit().Error; err != nil { fmt.Printf("提交事务失败: %v\n", err) return }
fmt.Println("事务提交成功") }
|
5.4 保存点使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| package main
import ( "fmt" "gorm.io/driver/mysql" "gorm.io/gorm" )
func main() { dsn := "user:password@tcp(127.0.0.1:3306)/dbname?charset=utf8mb4&parseTime=True&loc=Local" db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{}) if err != nil { panic(err) }
db.Transaction(func(tx1 *gorm.DB) error { fmt.Println("外层事务开始")
user := User{Name: "David"} if err := tx1.Create(&user).Error; err != nil { return err }
if err := tx1.SavePoint("sp1").Error; err != nil { return err }
order := Order{UserID: user.ID, Amount: 300} if err := tx1.Create(&order).Error; err != nil { fmt.Println("创建订单失败,回滚到保存点 sp1") tx1.RollbackTo("sp1") }
log := Log{UserID: user.ID, Action: "completed"} if err := tx1.Create(&log).Error; err != nil { return err }
fmt.Println("外层事务提交") return nil }) }
|
5.5 隔离级别配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| package main
import ( "context" "database/sql" "fmt" "time" "gorm.io/driver/mysql" "gorm.io/gorm" )
func main() { dsn := "user:password@tcp(127.0.0.1:3306)/dbname?charset=utf8mb4&parseTime=True&loc=Local" db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{}) if err != nil { panic(err) }
err = db.Transaction(func(tx *gorm.DB) error { fmt.Println("使用 Read Committed 隔离级别") var users []User return tx.Find(&users).Error }, &sql.TxOptions{ Isolation: sql.LevelReadCommitted, })
err = db.Transaction(func(tx *gorm.DB) error { fmt.Println("使用 Repeatable Read 隔离级别") var users []User return tx.Find(&users).Error }, &sql.TxOptions{ Isolation: sql.LevelRepeatableRead, })
err = db.Transaction(func(tx *gorm.DB) error { fmt.Println("使用 Serializable 隔离级别") var users []User return tx.Find(&users).Error }, &sql.TxOptions{ Isolation: sql.LevelSerializable, })
err = db.Transaction(func(tx *gorm.DB) error { fmt.Println("使用只读事务") var users []User return tx.Find(&users).Error }, &sql.TxOptions{ ReadOnly: true, })
db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{ DefaultTransactionTimeout: 10 * time.Second, }) if err != nil { panic(err) }
err = db.Transaction(func(tx *gorm.DB) error { var users []User return tx.Find(&users).Error }) }
|
5.6 复杂业务场景
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
| package main
import ( "errors" "fmt" "gorm.io/driver/mysql" "gorm.io/gorm" "gorm.io/gorm/clause" )
type TransferResult struct { Success bool Message string }
func Transfer(db *gorm.DB, fromID, toID uint, amount float64) (*TransferResult, error) { result := &TransferResult{Success: false}
err := db.Transaction(func(tx *gorm.DB) error { var from Account if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}). First(&from, fromID).Error; err != nil { result.Message = "转出账户不存在" return err }
if from.Balance < amount { result.Message = "余额不足" return errors.New("insufficient balance") }
var to Account if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}). First(&to, toID).Error; err != nil { result.Message = "转入账户不存在" return err }
from.Balance -= amount if err := tx.Save(&from).Error; err != nil { result.Message = "更新转出账户失败" return err }
to.Balance += amount if err := tx.Save(&to).Error; err != nil { result.Message = "更新转入账户失败" return err }
log := TransactionLog{ FromAccountID: fromID, ToAccountID: toID, Amount: amount, Status: "completed", } if err := tx.Create(&log).Error; err != nil { result.Message = "记录日志失败" return err }
result.Success = true result.Message = fmt.Sprintf("转账成功: %.2f", amount) return nil })
return result, err }
func CreateOrderWithItems(db *gorm.DB, order Order, items []OrderItem) error { return db.Transaction(func(tx *gorm.DB) error { if err := tx.Create(&order).Error; err != nil { return err }
for i, item := range items { item.OrderID = order.ID
var product Product if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}). First(&product, item.ProductID).Error; err != nil { return fmt.Errorf("产品 %d 不存在", item.ProductID) }
if product.Stock < item.Quantity { return fmt.Errorf("产品 %s 库存不足", product.Name) }
product.Stock -= item.Quantity if err := tx.Save(&product).Error; err != nil { return err }
if err := tx.Create(&item).Error; err != nil { return err }
fmt.Printf("订单项 %d: %s x %d\n", i+1, product.Name, item.Quantity) }
fmt.Printf("订单 %d 创建成功\n", order.ID) return nil }) }
|
六、最佳实践与故障排查
6.1 配置最佳实践
生产环境配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{ SkipDefaultTransaction: false,
DefaultTransactionTimeout: 30 * time.Second,
PrepareStmt: true, })
db.Session(&gorm.Session{ SkipDefaultTransaction: true, }).CreateInBatches(records, 100)
|
事务使用原则
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| db.Transaction(func(tx *gorm.DB) error { return tx.Create(&user).Error })
db.Transaction(func(tx *gorm.DB) error { if err := tx.Create(&user).Error; err != nil { return err }
resp, err := http.Get("http://api.example.com")
return nil })
resp, err := http.Get("http://api.example.com") if err != nil { return err }
db.Transaction(func(tx *gorm.DB) error { return tx.Create(&user).Error })
|
6.2 常见问题与解决方案
问题 1: 事务超时
症状: 事务执行时间过长导致超时
1 2 3 4 5 6 7 8 9 10 11 12 13
| db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{ DefaultTransactionTimeout: 5 * time.Minute, })
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel()
err = db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { return nil })
|
问题 2: 死锁
症状: 事务互相等待导致死锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
|
func Transfer(db *gorm.DB, id1, id2 uint, amount float64) error { return db.Transaction(func(tx *gorm.DB) error { if id1 > id2 { id1, id2 = id2, id1 }
var acc1 Account if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}). First(&acc1, id1).Error; err != nil { return err }
var acc2 Account if err := tx.Clauses(clause(clause.Locking{Strength: "UPDATE"}). First(&acc2, id2).Error; err != nil { return err }
acc1.Balance -= amount acc2.Balance += amount tx.Save(&acc1) tx.Save(&acc2)
return nil }) }
|
问题 3: 嵌套事务回滚影响外层
症状: 内层事务回滚导致外层也回滚
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| db.Transaction(func(tx1 *gorm.DB) error { tx1.Create(&User{Name: "Alice"})
tx2 := db.Begin() defer func() { if r := recover(); r != nil { tx2.Rollback() panic(r) } }()
tx2.Create(&Order{UserID: 1, Amount: 100}) if someError { tx2.Rollback() } else { tx2.Commit() }
return nil })
db.Session(&gorm.Session{DisableNestedTransaction: true}).Transaction(...)
|
问题 4: 隔离级别导致的并发问题
症状: 脏读、幻读、不可重复读
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
db.Transaction(func(tx *gorm.DB) error { var users []User return tx.Find(&users).Error }, &sql.TxOptions{ Isolation: sql.LevelReadCommitted, })
db.Transaction(func(tx *gorm.DB) error { var count int64 tx.Model(&User{}).Count(&count) tx.Model(&User{}).Count(&count) return nil }, &sql.TxOptions{ Isolation: sql.LevelRepeatableRead, })
db.Transaction(func(tx *gorm.DB) error { return nil }, &sql.TxOptions{ Isolation: sql.LevelSerializable, })
|
问题 5: 预编译语句与保存点冲突
症状: 使用 Preload 时嵌套事务报错
1 2 3 4 5 6 7 8 9 10 11 12 13
|
db.Session(&gorm.Session{PrepareStmt: false}).Transaction(func(tx *gorm.DB) error { tx.Transaction(func(tx2 *gorm.DB) error { return nil }) return nil })
db.Session(&gorm.Session{DisableNestedTransaction: true}).Transaction(...)
|
七、学习验证
7.1 知识自测
基础题
GORM 事务的默认行为是什么?
- A) Create/Update/Delete 自动使用事务
- B) 需要显式调用 Begin()
- C) 默认不使用事务
- D) 只在嵌套时使用事务
Transaction 函数如何处理 panic?
- A) 忽略 panic
- B) 自动回滚并返回错误
- C) 直接崩溃
- D) 需要手动 recover
嵌套事务使用什么机制实现?
- A) 真正的嵌套事务
- B) 保存点
- C) 独立事务
- D) 不支持嵌套
如何跳过默认事务?
- A) 设置 SkipDefaultTransaction = true
- B) 使用 Begin() 手动控制
- C) 使用 Session 配置
- D) 以上都是
Commit 什么时候被调用?
- A) Transaction 函数返回 nil
- B) Transaction 函数返回 error
- C) 发生 panic
- D) 手动调用
进阶题
以下代码的执行结果是什么?
1 2 3 4 5 6 7 8 9 10
| db.Transaction(func(tx1 *gorm.DB) error { tx1.Create(&User{Name: "A"})
tx1.Transaction(func(tx2 *gorm.DB) error { tx2.Create(&Order{Amount: 100}) return errors.New("error") })
return nil })
|
- A) 用户和订单都存在
- B) 用户存在,订单不存在
- C) 用户和订单都不存在
- D) 抛出异常
DisableNestedTransaction 的作用是什么?
- A) 禁用所有事务
- B) 内层事务不使用保存点
- C) 跳过默认事务
- D) 禁用事务超时
SavePoint 和 RollbackTo 的关系是?
- A) SavePoint 创建回滚点,RollbackTo 回滚到该点
- B) RollbackTo 创建回滚点,SavePoint 回滚
- C) 两者独立,没有关系
- D) SavePoint 提交,RollbackTo 回滚
BeginTransaction 在哪个阶段执行?
- A) before:create/update/delete
- B) create/update/delete
- C) after:create/update/delete
- D) 任何时候
如何实现读写分离的事务?
- A) 使用不同的连接池
- B) 使用不同的隔离级别
- C) 使用只读事务
- D) 以上都是
7.2 实践练习
练习 1: 实现转账功能
需求: 实现一个安全的转账功能,包括余额检查、锁定、日志记录。
验收标准:
练习 2: 实现订单创建
需求: 实现订单创建功能,包括库存扣减、订单项创建。
验收标准:
练习 3: 实现嵌套事务场景
需求: 实现一个包含多层嵌套的业务场景。
验收标准:
三、学习路径建议
3.1 前置知识检查
| 知识点 |
要求 |
检验方式 |
| SQL 事务 |
理解 BEGIN/COMMIT/ROLLBACK |
能写出基本的事务 SQL |
| ACID |
理解四个特性 |
能解释每个特性 |
| 隔离级别 |
了解不同级别的区别 |
能对比 Read Committed vs Serializable |
| 并发控制 |
了解锁的概念 |
能解释死锁 |
3.2 学习时间分配
| 内容 |
理论 |
实践 |
产出 |
| Day 1: 事务基础 |
2h |
1.5h |
简单事务示例 |
| Day 2: 嵌套事务 |
1.5h |
2h |
嵌套事务测试 |
| Day 3: 实际应用 |
1.5h |
2h |
业务场景实现 |
3.3 学习成果验收
理论验收:
实践验收:
综合验收: