此实验由golang写的。专附了一节课讲go。
为什么用go:
- 线程安全
- 垃圾回收类型语言,不用担心回收
闭包 closures & Goroutine
package main
import "sync"
func main(){
var wg sync.WaitGroup
for i := 0, i < 5; i ++ {
wg.Add(i)
// 使用goroutine,定义匿名函数
go func(x int) {
sendRPC(x)
wg.Done() // 匿名函数可以修改包外变量
}(i) // 意思是将i当作匿名函数的参数传给x
}
wg.Wait()
}
func sendRPC(i int){
println(i) // 这里打印出来的是随机排列的数字0-4
// 如果上面的匿名函数不传参数like:
// go func() {sendRPC(i)}()
// 就会打印45555
// 因为当运行sendRPC时,for循环已经改变了i的值
}
方便发送rpc。 当需要candidates投票选leader时,需要从所有follower那里投票,而不是one by one。
rpc是阻塞操作。
or leader想给所有follower发送追加内容,也适合用并行而不是串行。
// ———————————— 想启动一个raft,定期发送heartbreak,然后raft调用.kill, 就要杀死所有gorounine
import "time"
import "sync"
var done bool
var mu sync.Mutex
func main(){
time.Sleep(1 * time.Second)
println("started")
go periodic() // 无限循环的goroutine
time.Sleep(5 * time.Second) // 等待一段时间就把全局变量done设为true
// 因为done为共享变量,会被多个线程修改。所以改时要加锁。下锁同。
mu.Lock()
done = true
mu.Unlock()
println("cancelled")
time.Sleep(3 * time.Second)
}
func periodic() {
for {
println("tick")
time.Sleep(1 * time.Second)
mu.Lock() // 为了保证可以观察到这个done已经写成true了
if done { // 如果done为true就结束这个goroutine
return
}
mu.Unlock()
}
}
如果a已经用锁了,b要是再要锁,b就会阻塞,直到a释放锁。
如果main函数(线程)结束了(线程资源被释放),它启动的所有goroutine(协程,用的是线程资源)也都会结束。
func main(){
// 银行业务 俩人各1万
alice := 10000
bob := 10000
var mu sync.Mutex
total := alice + bob
// 转钱
go func() {
for i := 0; i < 1000; i++ {
// 如果这里不加锁,x个协程运行,可能会破坏其他协程,使最终结果不为x
mu.Lock()
defer mu.Unlock()
// 如果这里解锁又加锁,就无法保证原子性。这里sum需要是不变量,锁需要可以保护不变量。
alice -= 1
bob += 1
}
}()
go func() {
for i := 0; i < 1000; i++ {
mu.Lock()
defer mu.Unlock()
alice += 1
bob -= 1
}
}()
start := time.Now()
for time.Since(start) < 1 * time.Second {
mu.Lock()
if alice + bob != total {
fmt.Printf("observed viloation, alice = %v, bob = %v, sum = %v\n, alice, bob, alice + bob)
}
mu.Unlock()
}
}
condition variable 条件变量
raft peer变成一个candidate, 想给所有follower发投票请求,follower返回信息给candidate,表示它有无投票。 这里询问应该用并行而不是串行,而且不用问完,只要超过一半票数就可以停了。
cond:通过内再变化改变外部的条件判断。 cond.signle提高效率尽量不用
func main(){
rand.Seed(time.Now().UnixNano())
count := 0
finished := 0 //得到响应的数量
var mu sync.Mutex
// 解决busy waiting的好方法:cond
cond := sync.NewCond(&mu) // 1. 锁的指针传给cond
for i := 0; i < 10; i++{
go func(){
vote := requestVote()
mu.Lock() // 2.加锁
if vote {
count++
}
finished++
// 必须在有锁的条件下才能调用cond.
cond.Broadcast() // 3.它唤醒在cond.wait()处等待的线程
mu.Unlock() // 4.然后释放锁
}()
}
mu.Lock()
//for {
// !! busy waiting 反复检查一个条件,反复抢锁cpu被榨干
// mu.Lock()
// if count < 5 && finished != 10 {
// break
// }
// mu.Unlock()
// // 解决busy waiting的一个方法:但是50是一个定值
// // time.Sleep(50 * time.Millisecond)
//}
for count < 5 && finished != 10 { // 1. 检查条件 4.有锁时回到条件检查
// 2.如果条件为false,且有锁-> 3
cond.Wait() // 3.调用wait,以原子方式释放锁,并将锁放入等待线程列表
}
// 接4 所以这里是有锁的
if count >= 5 {
println("received 5+ vote!")
} else {
println("lost")
}
mu.Unlock()
}
channel
对数量小的场景有用 (unbuffer的): channel没有内部存储空间 channel是同步的,如果有人发了但没人接,就会阻塞 channel不能用于单个goroutine上,必须有另一个goroutine同一时刻做相反操作
buffer的: channel在没填满时是异步,填满了就和unbuffer一样
func main() {
c := make(chan int)
for i := 0; i < 4; i++ {
go doWork(c)
}
for {
v := <-c
println(v)
}
}
func doWork(c chan int) {
for {
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
c <- rand.Int()
}
}
// 相当于:
func main() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1) // 对内部counter +1
// 在go的外部调用wg,先执行add(1)再执行go
// 所以done永远在add之后调用
go func(x int) {
sendRPC(x)
wg.Done()
}(i)
}
wg.Wait() // 调用wait,它会等到直到done被调用的次数与add被调用的次数相同
}
func sendRPC(i int) {
println(i)
}