这个问题是因为golang运行时最大进行中线程数限制在10000个。
可以用创建线程池的方式限制同时运行的线程数量。
比如,带有缓冲的channel。
func doThing(d interface{}){
// 一些业务逻辑
}
func main() {
var data [1000]int // 假设有1000
poolSize := runtime.NumCPU() // 获取cpu核sem := make(chan struct{}, poolSize)
for _, d := range data {
sem <- struct{}{}
go func(d int){
doThing(d)
<-sem
}(d)
}
}
以上示例中,sem<- struct{}{}操作,在sem通道满的时候会暂停等待空出,因此保证里go func(d int)同时只有poolSize个。
import (
"context"
"golang.org/x/sync/semaphore"
)
func doThing(d interface{}){
// 一些操作
}
func main() {
data := [1000]int // 假设有1000个poolSize := runtime.NumCPU() // 获取cpu数核量sem := semaphore.NewWeighted(poolSize)
for _, d := range data {
sem.Acquire(context.Background(), 1) // 获取1个锁go func(d interface{}){
doThing(d)
sem.Release(1) // 释放1个锁
}(d)
}
}
以上示例基本思路与上一个channel缓冲示例一样,通过获取池子中的锁来控制并发数量。
Go实现并发的方法
Goroutine
func main(){
login()
go launch()
}
你还会发现,许多程序喜欢使用匿名函数来创建goroutine,如此代码中所示:
func main(){
login()
go func() {
launch()
}()
}
为了查看运行中的goroutine,让我们编写一个并发程序。
编写并发程序
package main
import (
"fmt"
"net/http"
"time"
)
func main() {
start := time.Now()
apis := []string{
"https://management.azure.com",
"https://dev.azure.com",
"https://api.github.com",
"https://outlook.office.com/",
"https://api.somewhereintheinternet.com/",
"https://graph.microsoft.com",
}
for _, api := range apis {
_, err := http.Get(api)
if err != nil {
fmt.Printf("ERROR: %s is down!\n", api)
continue
}
fmt.Printf("SUCCESS: %s is up and running!\n", api)
}
elapsed := time.Since(start)
fmt.Printf("Done! It took %v seconds!\n", elapsed.Seconds())
}
运行前面的代码时,将看到以下输出:
SUCCESS: https://management.azure.com is up and running!
SUCCESS: https://dev.azure.com is up and running!
SUCCESS: https://api.github.com is up and running!
SUCCESS: https://outlook.office.com/ is up and running!
ERROR: https://api.somewhereintheinternet.com/ is down!
SUCCESS: https://graph.microsoft.com is up and running!
Done! It took 1.658436834 seconds!
func checkAPI(api string) {
_, err := http.Get(api)
if err != nil {
fmt.Printf("ERROR: %s is down!\n", api)
return
}
fmt.Printf("SUCCESS: %s is up and running!\n", api)
}
注意,我们不再需要continue关键字,因为我们不在for循环中。 要停止函数的执行流,只需使用return关键字。 现在,我们需要修改main() 函数中的代码,为每个API创建一个goroutine,如下所示:
for _, api := range apis {
go checkAPI(api)
}
Done! It took 1.506e-05 seconds!
for _, api := range apis {
go checkAPI(api)
}
time.Sleep(3 * time.Second)
ERROR: https://api.somewhereintheinternet.com/ is down!
SUCCESS: https://api.github.com is up and running!
SUCCESS: https://management.azure.com is up and running!
SUCCESS: https://dev.azure.com is up and running!
SUCCESS: https://outlook.office.com/ is up and running!
SUCCESS: https://graph.microsoft.com is up and running!
Done! It took 3.002114573 seconds!
如果将WaitGroup结构添加到代码中,它可能会延迟main函数的执行,直到所有goroutine完成后。简单来说,它允许您设置一些必需的迭代,以便在允许应用程序继续之前从goroutine获得完整的响应。
Done递减WaitGroup计数器,直到WaitGroup计数器为零。
示例代码:
package main
import (
"fmt"
"time"
"sync"
)
type testConcurrency struct {
min int
max int
country string
}
func printCountry(test *testConcurrency, groupTest *sync.WaitGroup) {
for i :=test.max ; i>test.min; i-- {
time.Sleep(1*time.Millisecond)
fmt.Println(test.country)
}
fmt.Println()
groupTest.Done()
}
func main() {
groupTest := new(sync.WaitGroup)
japan := new(testConcurrency)
china := new(testConcurrency)
india := new(testConcurrency)
japan.country = "Japan"
japan.min = 0
japan.max = 5
china.country = "China"
china.min = 0
china.max = 5
india.country = "India"
india.min = 0
india.max = 5
go printCountry(japan, groupTest)
go printCountry(china, groupTest)
go printCountry(india, groupTest)
groupTest.Add(3)
groupTest.Wait()
}
输出:
Japan
India
China
India
Japan
China
Japan
India
China
India
Japan
China
Japan
India
China
Go标准库在运行时包中有一个名为GOMAXPROCS的函数,它允许我们指定调度程序要使用的逻辑处理器的数量。
如果我们为调度程序提供多个逻辑处理器来使用,我们将在示例程序的输出中看到不同的行为。如果你运行这个程序,你会看到goroutines是并行运行的。多个goroutine开始运行,显示中的字母和数字是混合的。输出基于在八核机器上运行程序,因此每个goroutine都在自己的内核上运行。
package main
import (
"fmt"
"runtime"
"sync"
)
func main() {
// Allocate three logical processors for the scheduler to use.
runtime.GOMAXPROCS(3)
// processTest is used to wait for the program to finish.
var processTest sync.WaitGroup
// Add a count of three, one for each goroutine.
processTest.Add(3)
// Declaration of three anonymous function and create a goroutine.
go func() {
defer processTest.Done()
for i := 0; i < 30; i++ {
for j := 51; j <= 100; j++ {
fmt.Printf(" %d", j)
if j == 100{
fmt.Println()
}
}
}
}()
go func() {
defer processTest.Done()
for j := 0; j < 10; j++ {
for char := 'A'; char < 'A'+26; char++ {
fmt.Printf("%c ", char)
if char == 'Z' {
fmt.Println()
}
}
}
}()
go func() {
defer processTest.Done()
for i := 0; i < 30; i++ {
for j := 0; j <= 50; j++ {
fmt.Printf(" %d", j)
if j == 50 {
fmt.Println()
}
}
}
}()
// Wait for the goroutines to finish.
processTest.Wait()
}
假设一支香烟需要三种成分来制作和吸烟:烟草、纸张和火柴。一张桌子周围有三个吸烟者,每个吸烟者都有三种成分中的一种——一个吸烟者有无限供应的烟草,另一个有纸,第三个有火柴。第四方,拥有无限供应的所有东西,随机选择一个吸烟者,并将香烟所需的用品放在桌子上。被选中的吸烟者吸烟,该过程应无限期重复。
示例代码:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
const (
paper = iota
grass
match
)
var smokeMap = map[int]string{
paper: "paper",
grass: "grass",
match: "match",
}
var names = map[int]string{
paper: "Sandy",
grass: "Apple",
match: "Daisy",
}
type Table struct {
paper chan int
grass chan int
match chan int
}
func arbitrate(t *Table, smokers [3]chan int) {
for {
time.Sleep(time.Millisecond * 500)
next := rand.Intn(3)
fmt.Printf("Table chooses %s: %s\n", smokeMap[next], names[next])
switch next {
case paper:
t.grass <- 1
t.match <- 1
case grass:
t.paper <- 1
t.match <- 1
case match:
t.grass <- 1
t.paper <- 1
}
for _, smoker := range smokers {
smoker <- next
}
wg.Add(1)
wg.Wait()
}
}
func smoker(t *Table, name string, smokes int, signal chan int) {
var chosen = -1
for {
chosen = <-signal // blocks
if smokes != chosen {
continue
}
fmt.Printf("Table: %d grass: %d match: %d\n", len(t.paper), len(t.grass), len(t.match))
select {
case <-t.paper:
case <-t.grass:
case <-t.match:
}
fmt.Printf("Table: %d grass: %d match: %d\n", len(t.paper), len(t.grass), len(t.match))
time.Sleep(10 * time.Millisecond)
select {
case <-t.paper:
case <-t.grass:
case <-t.match:
}
fmt.Printf("Table: %d grass: %d match: %d\n", len(t.paper), len(t.grass), len(t.match))
fmt.Printf("%s smokes a cigarette\n", name)
time.Sleep(time.Millisecond * 500)
wg.Done()
time.Sleep(time.Millisecond * 100)
}
}
const LIMIT = 1
var wg *sync.WaitGroup
func main() {
wg = new(sync.WaitGroup)
table := new(Table)
table.match = make(chan int, LIMIT)
table.paper = make(chan int, LIMIT)
table.grass = make(chan int, LIMIT)
var signals [3]chan int
// three smokers
for i := 0; i < 3; i++ {
signal := make(chan int, 1)
signals[i] = signal
go smoker(table, names[i], i, signal)
}
fmt.Printf("%s, %s, %s, sit with \n%s, %s, %s\n\n", names[0], names[1], names[2], smokeMap[0], smokeMap[1], smokeMap[2])
arbitrate(table, signals)
}
输出:
Sandy, Apple, Daisy, sit with
paper, grass, match
Table chooses match: Daisy
Table: 1 grass: 1 match: 0
Table: 1 grass: 0 match: 0
Table: 0 grass: 0 match: 0
Daisy smokes a cigarette
Table chooses paper: Sandy
Table: 0 grass: 1 match: 1
Table: 0 grass: 1 match: 0
Table: 0 grass: 0 match: 0
Sandy smokes a cigarette
Table chooses match: Daisy
Table: 1 grass: 1 match: 0
Table: 1 grass: 0 match: 0
Table: 0 grass: 0 match: 0
Daisy smokes a cigarette
一个理发店有N张沙发和一张理发椅。没有顾客要理发时,理发师便去睡觉。当一个顾客走进理发店时,如果所有的沙发都已被占用,他便离开理发店;否则,如果理发师正在为其他顾客理发,则该顾客就找一张空沙发坐下等待;如果理发师因无顾客正在睡觉,则由新到的顾客唤醒理发师为其理发。在理发完成后,顾客必须付费,直到理发师收费后才能离开理发店。
示例代码:
package main
import (
"fmt"
"sync"
"time"
)
const (
sleeping = iota
checking
cutting
)
var stateLog = map[int]string{
0: "Sleeping",
1: "Checking",
2: "Cutting",
}
var wg *sync.WaitGroup // Amount of potentional customers
type Barber struct {
name string
sync.Mutex
state int // Sleeping/Checking/Cutting
customer *Customer
}
type Customer struct {
name string
}
func (c *Customer) String() string {
return fmt.Sprintf("%p", c)[7:]
}
func NewBarber() (b *Barber) {
return &Barber{
name: "Sam",
state: sleeping,
}
}
// Barber goroutine
// Checks for customers
// Sleeps - wait for wakers to wake him up
func barber(b *Barber, wr chan *Customer, wakers chan *Customer) {
for {
b.Lock()
defer b.Unlock()
b.state = checking
b.customer = nil
// checking the waiting room
fmt.Printf("Checking waiting room: %d\n", len(wr))
time.Sleep(time.Millisecond * 100)
select {
case c := <-wr:
HairCut(c, b)
b.Unlock()
default: // Waiting room is empty
fmt.Printf("Sleeping Barber - %s\n", b.customer)
b.state = sleeping
b.customer = nil
b.Unlock()
c := <-wakers
b.Lock()
fmt.Printf("Woken by %s\n", c)
HairCut(c, b)
b.Unlock()
}
}
}
func HairCut(c *Customer, b *Barber) {
b.state = cutting
b.customer = c
b.Unlock()
fmt.Printf("Cutting %s hair\n", c)
time.Sleep(time.Millisecond * 100)
b.Lock()
wg.Done()
b.customer = nil
}
// customer goroutine
// just fizzles out if it's full, otherwise the customer
// is passed along to the channel handling it's haircut etc
func customer(c *Customer, b *Barber, wr chan<- *Customer, wakers chan<- *Customer) {
// arrive
time.Sleep(time.Millisecond * 50)
// Check on barber
b.Lock()
fmt.Printf("Customer %s checks %s barber | room: %d, w %d - customer: %s\n",
c, stateLog[b.state], len(wr), len(wakers), b.customer)
switch b.state {
case sleeping:
select {
case wakers <- c:
default:
select {
case wr <- c:
default:
wg.Done()
}
}
case cutting:
select {
case wr <- c:
default: // Full waiting room, leave shop
wg.Done()
}
case checking:
panic("Customer shouldn't check for the Barber when Barber is Checking the waiting room")
}
b.Unlock()
}
func main() {
b := NewBarber()
b.name = "Rocky"
WaitingRoom := make(chan *Customer, 5) // 5 chairs
Wakers := make(chan *Customer, 1) // Only one waker at a time
go barber(b, WaitingRoom, Wakers)
time.Sleep(time.Millisecond * 100)
wg = new(sync.WaitGroup)
n := 10
wg.Add(10)
// Spawn customers
for i := 0; i < n; i++ {
time.Sleep(time.Millisecond * 50)
c := new(Customer)
go customer(c, b, WaitingRoom, Wakers)
}
wg.Wait()
fmt.Println("No more customers for the day")
}
输出:
Checking waiting room: 0
Sleeping Barber -
Customer 120 checks Sleeping barber | room: 0, w 0 - customer:
Woken by 120
..............
..............
Checking waiting room: 0
No more customers for the day
该问题描述了两个进程,即生产者和使用者,它们共享一个用作队列的公共固定大小的缓冲区。生产者的工作是生成数据,将其放入缓冲区,然后重新开始。同时,消费者正在消耗数据(即,将其从缓冲区中删除),一次一个片段。问题在于确保生产者不会尝试将数据添加到缓冲区中(如果数据已满),并且使用者不会尝试从空缓冲区中删除数据。生产者的解决方案是,如果缓冲区已满,要么进入睡眠状态,要么丢弃数据。下次使用者从缓冲区中删除项目时,它会通知生产者,生产者再次开始填充缓冲区。同样,如果使用者发现缓冲区为空,则可以进入睡眠状态。下次生产者将数据放入缓冲区时,它会唤醒休眠的消费者。
示例代码:
package main
import (
"flag"
"fmt"
"log"
"os"
"runtime"
"runtime/pprof"
)
type Consumer struct {
msgs *chan int
}
// NewConsumer creates a Consumer
func NewConsumer(msgs *chan int) *Consumer {
return &Consumer{msgs: msgs}
}
// consume reads the msgs channel
func (c *Consumer) consume() {
fmt.Println("consume: Started")
for {
msg := <-*c.msgs
fmt.Println("consume: Received:", msg)
}
}
// Producer definition
type Producer struct {
msgs *chan int
done *chan bool
}
// NewProducer creates a Producer
func NewProducer(msgs *chan int, done *chan bool) *Producer {
return &Producer{msgs: msgs, done: done}
}
// produce creates and sends the message through msgs channel
func (p *Producer) produce(max int) {
fmt.Println("produce: Started")
for i := 0; i < max; i++ {
fmt.Println("produce: Sending ", i)
*p.msgs <- i
}
*p.done <- true // signal when done
fmt.Println("produce: Done")
}
func main() {
// profile flags
cpuprofile := flag.String("cpuprofile", "", "write cpu profile to `file`")
memprofile := flag.String("memprofile", "", "write memory profile to `file`")
// get the maximum number of messages from flags
max := flag.Int("n", 5, "defines the number of messages")
flag.Parse()
// utilize the max num of cores available
runtime.GOMAXPROCS(runtime.NumCPU())
// CPU Profile
if *cpuprofile != "" {
f, err := os.Create(*cpuprofile)
if err != nil {
log.Fatal("could not create CPU profile: ", err)
}
if err := pprof.StartCPUProfile(f); err != nil {
log.Fatal("could not start CPU profile: ", err)
}
defer pprof.StopCPUProfile()
}
var msgs = make(chan int) // channel to send messages
var done = make(chan bool) // channel to control when production is done
// Start a goroutine for Produce.produce
go NewProducer(&msgs, &done).produce(*max)
// Start a goroutine for Consumer.consume
go NewConsumer(&msgs).consume()
// Finish the program when the production is done
<-done
// Memory Profile
if *memprofile != "" {
f, err := os.Create(*memprofile)
if err != nil {
log.Fatal("could not create memory profile: ", err)
}
runtime.GC() // get up-to-date statistics
if err := pprof.WriteHeapProfile(f); err != nil {
log.Fatal("could not write memory profile: ", err)
}
f.Close()
}
}
输出:
consume: Started
produce: Started
produce: Sending 0
produce: Sending 1
consume: Received: 0
consume: Received: 1
produce: Sending 2
produce: Sending 3
consume: Received: 2
consume: Received: 3
produce: Sending 4
produce: Done
检查点同步是同步多个任务的问题。考虑一个车间,几个工人组装一些机制的细节。当他们每个人都完成他的工作时,他们会把细节放在一起。没有商店,所以先完成部分的工人必须等待其他人才能开始另一个。将细节放在一起是任务在路径分开之前自我同步的检查点。
示例代码:
package main
import (
"log"
"math/rand"
"sync"
"time"
)
func worker(part string) {
log.Println(part, "worker begins part")
time.Sleep(time.Duration(rand.Int63n(1e6)))
log.Println(part, "worker completes part")
wg.Done()
}
var (
partList = []string{"A", "B", "C", "D"}
nAssemblies = 3
wg sync.WaitGroup
)
func main() {
rand.Seed(time.Now().UnixNano())
for c := 1; c <= nAssemblies; c++ {
log.Println("begin assembly cycle", c)
wg.Add(len(partList))
for _, part := range partList {
go worker(part)
}
wg.Wait()
log.Println("assemble. cycle", c, "complete")
}
}
输出:
2019/07/15 16:10:32 begin assembly cycle 1
2019/07/15 16:10:32 D worker begins part
2019/07/15 16:10:32 A worker begins part
2019/07/15 16:10:32 B worker begins part
........
2019/07/15 16:10:32 D worker completes part
2019/07/15 16:10:32 C worker completes part
2019/07/15 16:10:32 assemble. cycle 3 complete
哲学家就餐问题可以这样表述,假设有五位哲学家围坐在一张圆形餐桌旁,做以下两件事情之一:吃饭,或者思考。吃东西的时候,他们就停止思考,思考的时候也停止吃东西。餐桌中间有一大碗意大利面,每两个哲学家之间有一只餐叉。因为用一只餐叉很难吃到意大利面,所以假设哲学家必须用两只餐叉吃东西。他们只能使用自己左右手边的那两只餐叉。哲学家就餐问题有时也用米饭和筷子而不是意大利面和餐叉来描述,因为很明显,吃米饭必须用两根筷子。
哲学家从来不交谈,这就很危险,可能产生死锁,每个哲学家都拿着左手的餐叉,永远都在等右边的餐叉(或者相反)。即使没有死锁,也有可能发生资源耗尽。例如,假设规定当哲学家等待另一只餐叉超过五分钟后就放下自己手里的那一只餐叉,并且再等五分钟后进行下一次尝试。这个策略消除了死锁(系统总会进入到下一个状态),但仍然有可能发生"活锁"。如果五位哲学家在完全相同的时刻进入餐厅,并同时拿起左边的餐叉,那么这些哲学家就会等待五分钟,同时放下手中的餐叉,再等五分钟,又同时拿起这些餐叉。
在实际的计算机问题中,缺乏餐叉可以类比为缺乏共享资源。一种常用的计算机技术是资源加锁,用来保证在某个时刻,资源只能被一个程序或一段代码访问。当一个程序想要使用的资源已经被另一个程序锁定,它就等待资源解锁。当多个程序涉及到加锁的资源时,在某些情况下就有可能发生死锁。例如,某个程序需要访问两个文件,当两个这样的程序各锁了一个文件,那它们都在等待对方解锁另一个文件,而这永远不会发生。
示例代码:main.go
package main
import (
"hash/fnv"
"log"
"math/rand"
"os"
"sync"
"time"
)
// Number of philosophers is simply the length of this list.
var ph = []string{"Mark", "Russell", "Rocky", "Haris", "Root"}
const hunger = 3 // Number of times each philosopher eats
const think = time.Second / 100 // Mean think time
const eat = time.Second / 100 // Mean eat time
var fmt = log.New(os.Stdout, "", 0)
var dining sync.WaitGroup
func diningProblem(phName string, dominantHand, otherHand *sync.Mutex) {
fmt.Println(phName, "Seated")
h := fnv.New64a()
h.Write([]byte(phName))
rg := rand.New(rand.NewSource(int64(h.Sum64())))
rSleep := func(t time.Duration) {
time.Sleep(t/2 + time.Duration(rg.Int63n(int64(t))))
}
for h := hunger; h > 0; h-- {
fmt.Println(phName, "Hungry")
dominantHand.Lock() // pick up forks
otherHand.Lock()
fmt.Println(phName, "Eating")
rSleep(eat)
dominantHand.Unlock() // put down forks
otherHand.Unlock()
fmt.Println(phName, "Thinking")
rSleep(think)
}
fmt.Println(phName, "Satisfied")
dining.Done()
fmt.Println(phName, "Left the table")
}
func main() {
fmt.Println("Table empty")
dining.Add(5)
fork0 := &sync.Mutex{}
forkLeft := fork0
for i := 1; i < len(ph); i++ {
forkRight := &sync.Mutex{}
go diningProblem(ph[i], forkLeft, forkRight)
forkLeft = forkRight
}
go diningProblem(ph[0], fork0, forkLeft)
dining.Wait() // wait for philosphers to finish
fmt.Println("Table empty")
}
输出:
Table empty
Mark seated
Mark Hungry
Mark Eating
..................
..................
Haris Thinking
Haris Satisfied
Haris Left the table
Table empty
下面的程序启动两个Goroutines。这两个Goroutine现在并发运行。我们创建了两个无缓冲的通道,并将它们传递给goroutines作为通道接收到的值的参数。
示例代码:main.go
package main
import (
"fmt"
)
func main() {
var intSlice = []int{91, 42, 23, 14, 15, 76, 87, 28, 19, 95}
chOdd := make(chan int)
chEven := make(chan int)
go odd(chOdd)
go even(chEven)
for _, value := range intSlice {
if value%2 != 0 {
chOdd <- value
} else {
chEven <- value
}
}
}
func odd(ch <-chan int) {
for v := range ch {
fmt.Println("ODD :", v)
}
}
func even(ch <-chan int) {
for v := range ch {
fmt.Println("EVEN:", v)
}
}
输出:
ODD : 91
ODD : 23
EVEN: 42
EVEN: 14
EVEN: 76
ODD : 15
ODD : 87
ODD : 19
ODD : 95
您在代码中调用go协程,但无法判断协程何时结束,值何时传递到缓冲通道。由于此代码是异步的,因此每当协程完成时,它都会将数据写入通道并在另一端读取。在上面的示例中,您只调用了两个go协程,因此行为是确定的,并且在大多数情况下会以某种方式生成相同的输出,但是当您增加go协程时,输出将不相同,并且顺序将不同,除非您使其同步。
示例代码:main.go
package main
import "fmt"
func sum(a []int, c chan int) {
sum := 0
for _, v := range a {
sum += v
}
c <- sum
}
func main() {
a := []int{17, 12, 18, 9, 24, 42, 64, 12, 68, 82, 57, 32, 9, 2, 12, 82, 52, 34, 92, 36}
c := make(chan int)
for i := 0; i < len(a); i = i + 5 {
go sum(a[i:i+5], c)
}
output := make([]int, 5)
for i := 0; i < 4; i++ {
output[i] = <-c
}
close(c)
fmt.Println(output)
}
输出:
[296 80 268 112 0]
main函数有两个无缓冲通道ch和quit。在斐波那契函数中,select语句会阻塞,直到其中一个情况准备就绪。
示例代码:main.go
package main
import (
"fmt"
)
func fibonacci(ch chan int, quit chan bool) {
x, y := 0, 1
for {
select {
case ch <- x: // write to channel ch
x, y = y, x+y
case <-quit:
fmt.Println("quit")
return
}
}
}
func main() {
ch := make(chan int)
quit := make(chan bool)
n := 10
go func(n int) {
for i := 0; i < n; i++ {
fmt.Println(<-ch) // read from channel ch
}
quit <- false
}(n)
fibonacci(ch, quit)
}
输出:
0
1
1
2
3
5
8
13
21
34
主要功能有两个功能:generator和receiver。我们创建一个c int通道并从生成器函数返回它。在匿名goroutine中运行的for循环将值写入通道c。
示例代码:main.go
package main
import (
"fmt"
)
func main() {
c := generator()
receiver(c)
}
func receiver(c <-chan int) {
for v := range c {
fmt.Println(v)
}
}
func generator() <-chan int {
c := make(chan int)
go func() {
for i := 0; i < 10; i++ {
c <- i
}
close(c)
}()
return c
}
输出:
0
1
2
3
4
5
6
7
8
9
该程序从10个Goroutines开始。我们创建了一个ch字符串通道,并通过同时运行10个goroutines将数据写入该通道。箭头相对于通道的方向指定是发送还是接收数据。指向ch的箭头指定我们要写入通道ch。箭头从ch向外指向指定我们正在从通道ch读取。
示例代码:main.go
package main
import (
"fmt"
"strconv"
)
func main() {
ch := make(chan string)
for i := 0; i < 10; i++ {
go func(i int) {
for j := 0; j < 10; j++ {
ch <- "Goroutine : " + strconv.Itoa(i)
}
}(i)
}
for k := 1; k <= 100; k++ {
fmt.Println(k, <-ch)
}
}
输出:
1 Goroutine : 9
2 Goroutine : 9
3 Goroutine : 2
4 Goroutine : 0
5 Goroutine : 1
6 Goroutine : 5
7 Goroutine : 3
8 Goroutine : 4
9 Goroutine : 7
10 Goroutine : 6
....................
goroutine是Go编程语言中的轻量级执行线程。它类似于其他编程语言中的线程,但它由Go运行时而不是操作系统管理。Goroutines允许在程序中并发执行函数,并且它们被设计为高效且可扩展。
在Go中,程序从单个goroutine(执行 main 函数)开始。可以使用go关键字后跟函数调用来创建其他goroutine。这将启动一个新的goroutine,该goroutine与原始goroutine同时运行。
Goroutines是非常轻量级的,可以在一个程序中创建数千甚至数百万个goroutine,而不会产生明显的开销。这使得在 Go 中编写并发程序变得容易,这些程序利用了多个CPU 内核,并且可以同时执行许多任务。
由于goroutines由Go运行时管理,因此它们会自动调度,并且可以使用通道相互通信。这使得编写复杂的并发程序变得容易,而不必担心锁定和同步等低级细节。
Goroutines使用通道相互通信,这是Go语言的内置功能。通道为goroutines提供了一种相互发送和接收值的方法,它们用于同步对共享数据的访问。
Goroutines是Go语言的一个关键特性,它们广泛用于并发和并行程序的设计中。它们使编写既高效又易于推理的代码变得容易。
新的goroutine由go语句创建。
要将函数作为goroutine运行,请调用以go语句为前缀的函数。下面是示例代码块:
sum() // 普通函数,主线程等待函数执行完
go sum() // goroutine函数,立即继续执行其他方法
go关键字使函数调用立即返回,而函数开始作为goroutine在后台运行,程序的其余部分继续执行。每个Golang程序的main函数都是使用goroutine启动的,因此每个Golang程序至少运行一个 goroutine。
创建 Goroutines
在每次调用函数responseSize之前添加了go关键字。三个responseSize goroutines同时启动,三个调用http。获取也是同时进行的。程序不会等到一个响应返回后再发送下一个请求。因此,使用 goroutines 可以更快地打印三种响应大小。
package main
import (
"fmt"
"io/ioutil"
"log"
"net/http"
"time"
)
func responseSize(url string) {
fmt.Println("Step1: ", url)
response, err := http.Get(url)
if err != nil {
log.Fatal(err)
}
fmt.Println("Step2: ", url)
defer response.Body.Close()
fmt.Println("Step3: ", url)
body, err := ioutil.ReadAll(response.Body)
if err != nil {
log.Fatal(err)
}
fmt.Println("Step4: ", len(body))
}
func main() {
go responseSize("https://www.golangprograms.com")
go responseSize("https://coderwall.com")
go responseSize("https://stackoverflow.com")
time.Sleep(10 * time.Second)
}
输出
Step1: https://www.golangprograms.com
Step1: https://stackoverflow.com
Step1: https://coderwall.com
Step2: https://stackoverflow.com
Step3: https://stackoverflow.com
Step4: 116749
Step2: https://www.golangprograms.com
Step3: https://www.golangprograms.com
Step4: 79551
Step2: https://coderwall.com
Step3: https://coderwall.com
Step4: 203842
我们添加了对时间的调用。在main函数中休眠,这可以防止主goroutines在responseSize goroutines完成之前退出。睡眠会让主程序睡10秒。
等待goroutines完成执行
同步sync包的WaitGroup类型用于等待程序完成从主函数启动的所有goroutine。它使用指定goroutines数量的计数器,Wait会阻止程序的执行,直到 aitGroup计数器为零。
Add方法用于向等待组添加计数器。
WaitGroup的Done方法是使用defer语句来递减WaitGroup计数器的。
WaitGroup类型的Wait方法等待程序完成所有goroutine。
Wait方法在main函数内部调用,该方法阻止执行,直到WaitGroup计数器达到零值并确保执行所有goroutine。
package main
import (
"fmt"
"io/ioutil"
"log"
"net/http"
"sync"
)
// 用于等待所有goroutine执行完
var wg sync.WaitGroup
func responseSize(url string) {
// 用于通知waitGroup当前goroutine执行完毕
defer wg.Done()
fmt.Println("Step1: ", url)
response, err := http.Get(url)
if err != nil {
log.Fatal(err)
}
fmt.Println("Step2: ", url)
defer response.Body.Close()
fmt.Println("Step3: ", url)
body, err := ioutil.ReadAll(response.Body)
if err != nil {
log.Fatal(err)
}
fmt.Println("Step4: ", len(body))
}
func main() {
// 告诉wg有3个goroutine需要等待
wg.Add(3)
fmt.Println("Start Goroutines")
go responseSize("https://www.golangprograms.com")
go responseSize("https://stackoverflow.com")
go responseSize("https://coderwall.com")
// 主线程等待所有goroutine执行完毕
wg.Wait()
fmt.Println("Terminating Program")
}
输出
Start Goroutines
Step1: https://coderwall.com
Step1: https://www.golangprograms.com
Step1: https://stackoverflow.com
Step2: https://stackoverflow.com
Step3: https://stackoverflow.com
Step4: 116749
Step2: https://www.golangprograms.com
Step3: https://www.golangprograms.com
Step4: 79801
Step2: https://coderwall.com
Step3: https://coderwall.com
Step4: 203842
Terminating Program
从 Goroutines 获取值
从goroutine获取值的最自然方法是通道。通道是连接并发goroutines的管道。您可以将值从一个goroutine发送到通道中,并将这些值接收到另一个goroutine或同步函数中。
package main
import (
"fmt"
"io/ioutil"
"log"
"net/http"
"sync"
)
var wg sync.WaitGroup
func responseSize(url string, nums chan int) {
defer wg.Done()
response, err := http.Get(url)
if err != nil {
log.Fatal(err)
}
defer response.Body.Close()
body, err := ioutil.ReadAll(response.Body)
if err != nil {
log.Fatal(err)
}
nums <- len(body)
}
func main() {
nums := make(chan int) // 声明一个无缓冲通道
wg.Add(1)
go responseSize("https://www.golangprograms.com", nums)
fmt.Println(<-nums) // 冲通道中读取值
wg.Wait()
close(nums) // 关闭通道
}
输出
79655
控制Goroutine的程序
使用通道,我们可以执行和暂停goroutine的流程。通道通过充当goroutines之间的管道来处理此通信。
package main
import (
"fmt"
"sync"
"time"
)
var i int
func work() {
time.Sleep(250 * time.Millisecond)
i++
fmt.Println(i)
}
func routine(command <-chan string, wg *sync.WaitGroup) {
defer wg.Done()
var status = "Play"
for {
select {
case cmd := <-command:
fmt.Println(cmd)
switch cmd {
case "Stop":
return
case "Pause":
status = "Pause"
default:
status = "Play"
}
default:
if status == "Play" {
work()
}
}
}
}
func main() {
var wg sync.WaitGroup
wg.Add(1)
command := make(chan string)
go routine(command, &wg)
time.Sleep(1 * time.Second)
command <- "Pause"
time.Sleep(1 * time.Second)
command <- "Play"
time.Sleep(1 * time.Second)
command <- "Stop"
wg.Wait()
}
输出
1
2
3
4
Pause
Play
5
6
7
8
9
Stop
使用原子函数修复竞争条件
由于对共享资源的访问不同步并尝试同时读取和写入该资源,因此会出现竞争条件。
原子函数提供低级锁定机制,用于同步对整数和指针的访问。原子函数通常用于修复竞争条件。
sync包下的atomic中的函数通过锁定对共享资源的访问来支持同步goroutine。
package main
import (
"fmt"
"runtime"
"sync"
"sync/atomic"
)
var (
counter int32
wg sync.WaitGroup
)
func main() {
wg.Add(3)
go increment("Python")
go increment("Java")
go increment("Golang")
wg.Wait()
fmt.Println("Counter:", counter)
}
func increment(name string) {
defer wg.Done()
for range name {
atomic.AddInt32(&counter, 1)
runtime.Gosched() // 让出当前goroutine执行权
}
}
原子包中的AddInt32函数通过强制一次只有一个goroutine可以执行和完成此添加操作来同步整数值的添加。当goroutines尝试调用任何原子函数时,它们会自动与引用的变量同步。
请注意,如果将代码行atomic.AddInt32(&counter, 1)替换为counter++,然后您将看到以下输出:
C:\Golang\goroutines>go run -race main.go
==================
WARNING: DATA RACE
Read at 0x0000006072b0 by goroutine 7:
main.increment()
C:/Golang/goroutines/main.go:31 +0x76
Previous write at 0x0000006072b0 by goroutine 8:
main.increment()
C:/Golang/goroutines/main.go:31 +0x90
Goroutine 7 (running) created at:
main.main()
C:/Golang/goroutines/main.go:18 +0x7e
Goroutine 8 (running) created at:
main.main()
C:/Golang/goroutines/main.go:19 +0x96
==================
Counter: 15
Found 1 data race(s)
exit status 66
C:\Golang\goroutines>
输出
C:\Golang\goroutines>go run -race main.go
Counter: 15
使用Mutex定义关键部分
互斥锁用于围绕代码创建一个关键部分,以确保一次只有一个goroutine可以执行该代码部分。
package main
import (
"fmt"
"sync"
)
var (
counter int32
wg sync.WaitGroup
mutex sync.Mutex // 用于定义需要保护的代码块,指该部分代码块保证每次只有一个线程访问。
)
func main() {
wg.Add(3)
go increment("Python")
go increment("Go Programming Language")
go increment("Java")
wg.Wait()
fmt.Println("Counter:", counter)
}
func increment(lang string) {
defer wg.Done()
for i := 0; i < 3; i++ {
mutex.Lock()
{
fmt.Println(lang)
counter++
}
mutex.Unlock()
}
}
输出
C:\Golang\goroutines>go run -race main.go
Python
Python
Python
Java
Java
Java
Go Programming Language
Go Programming Language
Go Programming Language
Counter: 9
C:\Golang\goroutines>
由对Lock()和Unlock()的调用定义的关键部分可针对计数器变量有保护操作。
Goroutine的中文名称是协程。
Goroutine是Go语言中的轻量级线程,由Go语言的运行时环境(runtime)管理。与操作系统线程相比,Goroutine更加轻量级,可以同时开启大量的Goroutine,且切换代价很小,能轻松利用硬件并发性。