sync.Pool临时对象池
关于 pool 的由来可以参考:
sync.Pool 的作用及为什么要用到它
Rob Pike 扩展了sync.pool 类型的文档,并且将其目的描述得更清楚:
Pool设计用意是在全局变量里维护的释放链表,尤其是被多个 goroutine 同时访问的全局变量。使用Pool代替自己写的释放链表,可以让程序运行的时候,在恰当的场景下从池里重用某项值。sync.Pool一种合适的方法是,为临时缓冲区创建一个池,多个客户端使用这个缓冲区来共享全局资源。另一方面,如果释放链表是某个对象的一部分,并由这个对象维护,而这个对象只由一个客户端使用,在这个客户端工作完成后释放链表,那么用Pool实现这个释放链表是不合适的。
“临时对象”的意思是:不需要持久使用的某一类值。
这类值对于程序来说可有可无,但如果有的话会明显更好。它们的创建和销毁可以在任何时候发生,并且完全不会影响到程序的功能。
sync.Pool主要是为了重用对象,一方面缩短了申请空间的时间,另一方面,还减轻了GC的压力。
不过它是一个临时对象池,为什么这么说呢?因为对象池中的对象会被GC回收。所以说,有状态的对象,比如数据库连接是不能够用sync.Pool来实现的。
gc触发的时机:2分钟或者内存占用达到一个阈值(当前堆内存占用是上次gc后对内存占用的两倍,当GOGC=100时)
Pool 类型数据结构及其两个方法
如下是 pool 类型的数据结构:
type Pool struct {
noCopy noCopy
local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal ;[P]poolLocal 数组指针
localSize uintptr // size of the local array,数组大小
// New optionally specifies a function to generate
// a value when Get would otherwise return nil.
// It may not be changed concurrently with calls to Get.
New func() interface{}
}
// Local per-P Pool appendix.
type poolLocalInternal struct {
private interface{} // Can be used only by the respective P.;私有缓存区
shared []interface{} // Can be used by any P.;公共缓存区
Mutex // Protects shared.
}
type poolLocal struct {
poolLocalInternal
// Prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
Pool是提供给外部使用的对象。其中的local成员的真实类型是一个poolLocal数组,localSize是数组长度。poolLocal是真正保存数据的地方。priveate保存了一个临时对象,shared是保存临时对象的数组。
为什么Pool中需要这么多poolLocal对象呢?实际上,Pool是给每个线程分配了一个poolLocal对象。也就是说local数组的长度,就是工作线程的数量(size := runtime.GOMAXPROCS(0))。当多线程在并发读写的时候,通常情况下都是在自己线程的poolLocal中存取数据。当自己线程的poolLocal中没有数据时,才会尝试加锁去其他线程的poolLocal中“偷”数据。
sync.Pool 类型只有两个方法——Put 和 Get。
Put 是将临时对象放回当前池中,用于存放的作用。
Get 用于从当前的池中获取临时对象,返回一个Interface{}。
func (p *Pool) Put(x interface{}){
......
}
func (p *Pool) Get() interface{}{
......
}
其中特别说一下这个Get方法,它是去池中获取临时对象,从池中选择一个任意项,将其从池中删除,然后将其返回给调用者。
所以 Get 把返回的对象从池子里面删除。所以用完了的对象,还是得重新放回池子。
如果我们在使用Get申请新对象时pool中没有可用的对象,那么就会返回nil,除非设置了sync.Pool的New func。
我们来看看具体Get的源码解释:
func (p *Pool) Get() interface{} {
if race.Enabled {
race.Disable()
}
l := p.pin()
x := l.private
l.private = nil
runtime_procUnpin()
if x == nil {
l.Lock()
last := len(l.shared) - 1
if last >= 0 {
x = l.shared[last]
l.shared = l.shared[:last]
}
l.Unlock()
if x == nil {
x = p.getSlow()
}
}
if race.Enabled {
race.Enable()
if x != nil {
race.Acquire(poolRaceAddr(x))
}
}
if x == nil && p.New != nil {
x = p.New()
}
return x
}
func (p *Pool) getSlow() (x interface{}) {
// See the comment in pin regarding ordering of the loads.
size := atomic.LoadUintptr(&p.localSize) // load-acquire
local := p.local // load-consume
// Try to steal one element from other procs.
pid := runtime_procPin()
runtime_procUnpin()
for i := 0; i < int(size); i++ {
l := indexLocal(local, (pid+i+1)%int(size))
l.Lock()
last := len(l.shared) - 1
if last >= 0 {
x = l.shared[last]
l.shared = l.shared[:last]
l.Unlock()
break
}
l.Unlock()
}
return x
}
- Get()获取对象时:优先从 private 空间获取 -> 没有则加锁从 share 空间获取 ( 从尾部开始获取)-> 没有再 new func 新的对象 (此对象不会放回池中)直接返回给调用者
为什么这里要锁住。答案在getSlow中。因为当shared中没有数据的时候,会尝试去其他的poolLocal的shared中偷数据。
- 注意:Get 操作后 (在返回之前就会将它从池中删除),缓存对象彻底与 Pool 失去引用关联,需要自行 Put 放回。
放回到池中的对象会被GC回收。
但当你Get 的时候,是任意获取对象的,也就是说有可能是当前池中被放回的资源,也有可能是最后被New 出来的资源。
也就是说我们不能对从对象池申请到的对象值做任何假设,可能是New新生成的,可能是被某个协程修改过放回来的这当中会被入坑。
在来看看Put具体源码解释:
// Put adds x to the pool.
func (p *Pool) Put(x interface{}) {
if x == nil {
return
}
if race.Enabled {
if fastrand()%4 == 0 {
// Randomly drop x on floor.
return
}
race.ReleaseMerge(poolRaceAddr(x))
race.Disable()
}
l := p.pin()
if l.private == nil {
l.private = x
x = nil
}
runtime_procUnpin()
if x != nil {
l.Lock()
l.shared = append(l.shared, x)
l.Unlock()
}
if race.Enabled {
race.Enable()
}
}
Put() 优先放入 private 空间 -> 其次再考虑 share 空间
Pool 高效的设计的地方就在于将数据分散在了各个真正并发的线程中,每个线程优先从自己的poolLocal中获取数据,很大程度上降低了锁竞争。
sync.Pool实际调用:
1. 最简单的数据存放:
package main
import (
"fmt"
"sync"
)
func main() {
p:=&sync.Pool{
New: func() interface{}{
return 0
},
}
p.Put("jiangzhou")
p.Put(123456)
fmt.Println(p.Get())
fmt.Println(p.Get())
fmt.Println(p.Get())
}
输出:
jiangzhou
123456
0
2.临时使用一些大型结构体,可以用Pool来减少GC。
package main
import (
"sync"
"time"
"fmt"
)
type structR6 struct {
B1 [100000]int
}
var r6Pool = sync.Pool{
New: func() interface{} {
return new(structR6)
},
}
func usePool() {
startTime := time.Now()
for i := 0; i < 10000; i++ {
sr6 := r6Pool.Get().(*structR6)
sr6.B1[0] = 0
r6Pool.Put(sr6)
}
fmt.Println("pool Used:", time.Since(startTime))
}
func standard() {
startTime := time.Now()
for i := 0; i < 10000; i++ {
var sr6 structR6
sr6.B1[0] = 0
}
fmt.Println("standard Used:", time.Since(startTime))
}
func main() {
standard()
usePool()
}
输出:
standard Used: 263.24691ms
pool Used: 733.61µs
很明显,运用临时池存放再调用要省事的多。
一个含有100000个int值的结构体,在标准方法中,每次均新建,重复10000次,一共需要耗费263.24691ms;
如果用完的struct可以废物利用,放回pool中。需要新的结构体的时候,尝试去pool中取,而不是重新生成,重复10000次仅需要733.61µs。注意单位哦。
这样简单的操作,却节约了99.75%的时间,也节约了各方面的资源。最重要的是它可以有效减少GC CPU和GC Pause。
func main() {
//我们创建一个Pool,并实现New()函数
sp := sync.Pool{
//New()函数的作用是当我们从Pool中Get()对象时,如果Pool为空,则先通过New创建一个对象,插入Pool中,然后返回对象。
New: func() interface{} {
return make([]int, 16)
},
}
item := sp.Get()
//打印可以看到,我们通过New返回的大小为16的[]int
fmt.Println("item : ", item)
//然后我们对item进行操作
//New()返回的是interface{},我们需要通过类型断言来转换
for i := 0; i < len(item.([]int)); i++ {
item.([]int)[i] = i
}
fmt.Println("item : ", item)
//使用完后,我们把item放回池中,让对象可以重用
sp.Put(item)
//再次从池中获取对象
item2 := sp.Get()
//注意这里获取的对象就是上面我们放回池中的对象
fmt.Println("item2 : ", item2)
//我们再次获取对象
item3 := sp.Get()
//因为池中的对象已经没有了,所以又重新通过New()创建一个新对象,放入池中,然后返回
//所以item3是大小为16的空[]int
fmt.Println("item3 : ", item3)
//测试sync.Pool保存socket长连接池
//testTcpConnPool()
}
输出如下:
item : [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
item : [0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15]
item2 : [0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15]
item3 : [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
注意 Pool不适用于有状态的数据
上面提到过:
因为对象池中的对象会被GC回收。所以说,有状态的对象,比如数据库连接、socket长连接等 是不能够用sync.Pool来实现的
package main
import (
"sync"
"net"
"fmt"
"runtime"
)
func main() {
sp2 := sync.Pool{
New: func() interface{} {
conn, err := net.Dial("tcp", "127.0.0.1:8888");
if err != nil {
return nil
}
return conn
},
}
buf := make([]byte, 1024)
//获取对象
conn := sp2.Get().(net.Conn)
//使用对象
conn.Write([]byte("GET / HTTP/1.1 \r\n\r\n"))
n, _ := conn.Read(buf)
fmt.Println("conn read : ", string(buf[:n]))
//打印conn的地址
fmt.Println("coon地址:",conn)
//把对象放回池中
sp2.Put(conn)
//我们人为的进行一次垃圾回收
runtime.GC()
//再次获取池中的对象
conn2 := sp2.Get().(net.Conn)
//这时发现conn2的地址与上面的conn的地址不一样了
//说明池中我们之前放回的对象被全部清除了,显然这并不是我们想看到的
//所以sync.Pool不适合用于scoket长连接或数据库连接池
fmt.Println("coon2地址",conn2)
}
推荐一开源的pool库:
另外,当理解这篇文章后,又看过源码解释,我建议去看看
极客时间 Go语言核心36讲 第33 sync.Pool,这篇文章,结合源码理解看看会又有新的理解,但不适合没看过源码的朋友,不然你会觉得:这是讲的什么鬼。。。