golang runtime: program exceeds 10000-thread limit问题解决,控制goroutines数量

这个问题是因为golang运行时最大进行中线程数限制在10000个。

可以用创建线程池的方式限制同时运行的线程数量。

比如,带有缓冲的channel。

func doThing(d interface{}){
    // 一些业务逻辑
}
func main() {
    var data [1000]int // 假设有1000   
    poolSize := runtime.NumCPU() // 获取cpu核sem := make(chan struct{}, poolSize)
    for _, d := range data {
        sem <- struct{}{}
        go func(d int){
            doThing(d)
            <-sem
        }(d)
    }
}

以上示例中,sem<- struct{}{}操作,在sem通道满的时候会暂停等待空出,因此保证里go func(d int)同时只有poolSize个。

import (
    "context"
    "golang.org/x/sync/semaphore"
)

func doThing(d interface{}){
    // 一些操作
}

func main() {
    data := [1000]int // 假设有1000个poolSize := runtime.NumCPU() // 获取cpu数核量sem := semaphore.NewWeighted(poolSize)
    for _, d := range data {
        sem.Acquire(context.Background(), 1) // 获取1个锁go func(d interface{}){
            doThing(d)
            sem.Release(1) // 释放1个锁
        }(d)
    }
}

以上示例基本思路与上一个channel缓冲示例一样,通过获取池子中的锁来控制并发数量。

Go语言教程之边写边学:了解go中并发工作原理:了解goroutine
并发是独立活动的组合,就像Web服务器虽然同时处理多个用户请求,但它是自主运行的。 并发在当今的许多程序中都存在。 Web服务器就是一个例子,但你也能看到,在批量处理大量数据时也需要使用并发。
Go有两种编写并发程序的样式。 一种是在其他语言中通过线程实现的传统样式。 在本模块中,你将了解Go的样式,其中值是在称为goroutine的独立活动之间传递的,以与进程进行通信。
如果这是你第一次学习并发,我们建议你多花一些时间来查看我们将要编写的每一段代码,以进行实践。

Go实现并发的方法

通常,编写并发程序时最大的问题是在进程之间共享数据。 Go采用不同于其他编程语言的通信方式,因为Go是通过channel来回传递数据的。 此方法意味着只有一个活动 (goroutine) 有权访问数据,设计上不存在争用条件。 学完本模块中的goroutine和channel之后,你将更好地理解Go的并发方法。
Go的方法可以总结为以下口号:"不是通过共享内存通信,而是通过通信共享内存。"我们将在以下部分介绍此方法,但你也可以在Go博客文章通过通信共享内存中了解详细信息。
如前所述,Go还提供低级别的并发基元。 但在本模块中,我们只介绍Go的惯用并发方法。
让我们从探索goroutine开始。

Goroutine

goroutine是轻量线程中的并发活动,而不是在操作系统中进行的传统活动。 假设你有一个写入输出的程序和另一个计算两个数字相加的函数。 一个并发程序可以有数个goroutine同时调用这两个函数。
我们可以说,程序执行的第一个goroutine是main() 函数。 如果要创建其他goroutine,则必须在调用该函数之前使用go关键字,如下所示:
func main(){
    login()
    go launch()
}

你还会发现,许多程序喜欢使用匿名函数来创建goroutine,如此代码中所示:

func main(){
    login()
    go func() {
        launch()
    }()
}

为了查看运行中的goroutine,让我们编写一个并发程序。

编写并发程序

由于我们只想将重点放在并发部分,因此我们使用现有程序来检查API终结点是否响应。 代码如下:
package main

import (
    "fmt"
    "net/http"
    "time"
)

func main() {
    start := time.Now()

    apis := []string{
        "https://management.azure.com",
        "https://dev.azure.com",
        "https://api.github.com",
        "https://outlook.office.com/",
        "https://api.somewhereintheinternet.com/",
        "https://graph.microsoft.com",
    }

    for _, api := range apis {
        _, err := http.Get(api)
        if err != nil {
            fmt.Printf("ERROR: %s is down!\n", api)
            continue
        }

        fmt.Printf("SUCCESS: %s is up and running!\n", api)
    }

    elapsed := time.Since(start)
    fmt.Printf("Done! It took %v seconds!\n", elapsed.Seconds())
}

运行前面的代码时,将看到以下输出:

SUCCESS: https://management.azure.com is up and running!
SUCCESS: https://dev.azure.com is up and running!
SUCCESS: https://api.github.com is up and running!
SUCCESS: https://outlook.office.com/ is up and running!
ERROR: https://api.somewhereintheinternet.com/ is down!
SUCCESS: https://graph.microsoft.com is up and running!
Done! It took 1.658436834 seconds!
这里没有什么特别之处,但我们可以做得更好。 或许我们可以同时检查所有站点? 此程序可以在500毫秒的时间内完成,不需要耗费将近两秒。
请注意,我们需要并发运行的代码部分是向站点进行HTTP调用的部分。 换句话说,我们需要为程序要检查的每个API创建一个goroutine。
为了创建goroutine,我们需要在调用函数前使用go关键字。 但我们在这里没有函数。 让我们重构该代码并创建一个新函数,如下所示:
func checkAPI(api string) {
    _, err := http.Get(api)
    if err != nil {
        fmt.Printf("ERROR: %s is down!\n", api)
        return
    }

    fmt.Printf("SUCCESS: %s is up and running!\n", api)
}

注意,我们不再需要continue关键字,因为我们不在for循环中。 要停止函数的执行流,只需使用return关键字。 现在,我们需要修改main() 函数中的代码,为每个API创建一个goroutine,如下所示:

for _, api := range apis {
    go checkAPI(api)
}
重新运行程序,看看发生了什么。
看起来程序不再检查API了,对吗? 显示的内容可能与以下输出类似:
Done! It took 1.506e-05 seconds!
速度可真快! 发生了什么情况? 你会看到最后一条消息,指出程序已完成,因为Go为循环中的每个站点创建了一个goroutine,并且它立即转到下一行。
即使看起来checkAPI函数没有运行,它实际上是在运行。 它只是没有时间完成。 请注意,如果在循环之后添加一个睡眠计时器会发生什么,如下所示:
for _, api := range apis {
    go checkAPI(api)
}

time.Sleep(3 * time.Second)
现在,重新运行程序时,可能会看到如下所示的输出:
ERROR: https://api.somewhereintheinternet.com/ is down!
SUCCESS: https://api.github.com is up and running!
SUCCESS: https://management.azure.com is up and running!
SUCCESS: https://dev.azure.com is up and running!
SUCCESS: https://outlook.office.com/ is up and running!
SUCCESS: https://graph.microsoft.com is up and running!
Done! It took 3.002114573 seconds!
看起来似乎起作用了,对吧? 不完全如此。 如果你想在列表中添加一个新站点呢? 也许三秒钟是不够的。 你怎么知道? 你无法管理。 必须有更好的方法,这就是我们在下一节讨论channel时要涉及的内容。
Go语言教程之边写边学:基础练习:如何使用WaitGroup将main函数的执行延迟到所有goroutines完成后

如果将WaitGroup结构添加到代码中,它可能会延迟main函数的执行,直到所有goroutine完成后。简单来说,它允许您设置一些必需的迭代,以便在允许应用程序继续之前从goroutine获得完整的响应。
Done递减WaitGroup计数器,直到WaitGroup计数器为零。

 

示例代码:

package main

import (
	"fmt"
	"time"
	"sync"
)	

type testConcurrency struct {
	min int
	max int
	country string
}

func printCountry(test *testConcurrency, groupTest *sync.WaitGroup) {	
	for i :=test.max ; i>test.min; i-- {	
			time.Sleep(1*time.Millisecond)			
			fmt.Println(test.country)
	}

	fmt.Println()
	groupTest.Done()	
}

func  main() {
	groupTest := new(sync.WaitGroup)
	
	japan := new(testConcurrency)
	china := new(testConcurrency)
	india := new(testConcurrency)

	japan.country = "Japan"
	japan.min = 0
	japan.max = 5

	china.country = "China"
	china.min = 0
	china.max = 5

	india.country = "India"
	india.min = 0
	india.max = 5

	go printCountry(japan, groupTest)
	go printCountry(china, groupTest)
	go printCountry(india, groupTest)

	groupTest.Add(3)
	groupTest.Wait()
}

输出:

Japan
India
China
India
Japan
China
Japan
India
China
India
Japan
China
Japan

India

China
Go语言教程之边写边学:基础练习:如何创建多个goroutine,如何使用三个逻辑处理器

Go标准库在运行时包中有一个名为GOMAXPROCS的函数,它允许我们指定调度程序要使用的逻辑处理器的数量。

如果我们为调度程序提供多个逻辑处理器来使用,我们将在示例程序的输出中看到不同的行为。如果你运行这个程序,你会看到goroutines是并行运行的。多个goroutine开始运行,显示中的字母和数字是混合的。输出基于在八核机器上运行程序,因此每个goroutine都在自己的内核上运行。

package main

import (
	"fmt"
	"runtime"
	"sync"
)

func main() {
	// Allocate three logical processors for the scheduler to use.
	runtime.GOMAXPROCS(3)

	// processTest is used to wait for the program to finish.
	var processTest sync.WaitGroup
	// Add a count of three, one for each goroutine.
	processTest.Add(3)
	
	// Declaration of three anonymous function and create a goroutine.
	go func() {
		defer processTest.Done()
		for i := 0; i < 30; i++ {
			for j := 51; j <= 100; j++ {
				fmt.Printf(" %d", j)
				if j == 100{
					fmt.Println()
				}
			}
		}
	}()
	go func() {
		defer processTest.Done()
		for j := 0; j < 10; j++ {
			for char := 'A'; char < 'A'+26; char++ {
				fmt.Printf("%c ", char)
				if char == 'Z' {
					fmt.Println()
				}

			}
		}
	}()
	go func() {
		defer processTest.Done()
		for i := 0; i < 30; i++ {
			for j := 0; j <= 50; j++ {
				fmt.Printf(" %d", j)
				if j == 50 {
					fmt.Println()
				}
			}
		}
	}()

	// Wait for the goroutines to finish.
	processTest.Wait()	
}
Go语言教程之边写边学:Goroutines和Channels练习:吸烟者问题

假设一支香烟需要三种成分来制作和吸烟:烟草、纸张和火柴。一张桌子周围有三个吸烟者,每个吸烟者都有三种成分中的一种——一个吸烟者有无限供应的烟草,另一个有纸,第三个有火柴。第四方,拥有无限供应的所有东西,随机选择一个吸烟者,并将香烟所需的用品放在桌子上。被选中的吸烟者吸烟,该过程应无限期重复。

 

示例代码:

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

const (
	paper = iota
	grass
	match
)

var smokeMap = map[int]string{
	paper: "paper",
	grass: "grass",
	match: "match",
}

var names = map[int]string{
	paper: "Sandy",
	grass: "Apple",
	match: "Daisy",
}

type Table struct {
	paper chan int
	grass chan int
	match chan int
}

func arbitrate(t *Table, smokers [3]chan int) {
	for {
		time.Sleep(time.Millisecond * 500)
		next := rand.Intn(3)
		fmt.Printf("Table chooses %s: %s\n", smokeMap[next], names[next])
		switch next {
		case paper:
			t.grass <- 1
			t.match <- 1
		case grass:
			t.paper <- 1
			t.match <- 1
		case match:
			t.grass <- 1
			t.paper <- 1
		}
		for _, smoker := range smokers {
			smoker <- next
		}
		wg.Add(1)
		wg.Wait()
	}
}

func smoker(t *Table, name string, smokes int, signal chan int) {
	var chosen = -1
	for {
		chosen = <-signal // blocks

		if smokes != chosen {
			continue
		}

		fmt.Printf("Table: %d grass: %d match: %d\n", len(t.paper), len(t.grass), len(t.match))
		select {
		case <-t.paper:
		case <-t.grass:
		case <-t.match:
		}
		fmt.Printf("Table: %d grass: %d match: %d\n", len(t.paper), len(t.grass), len(t.match))
		time.Sleep(10 * time.Millisecond)
		select {
		case <-t.paper:
		case <-t.grass:
		case <-t.match:
		}
		fmt.Printf("Table: %d grass: %d match: %d\n", len(t.paper), len(t.grass), len(t.match))
		fmt.Printf("%s smokes a cigarette\n", name)
		time.Sleep(time.Millisecond * 500)
		wg.Done()
		time.Sleep(time.Millisecond * 100)
	}
}

const LIMIT = 1

var wg *sync.WaitGroup

func main() {
	wg = new(sync.WaitGroup)
	table := new(Table)
	table.match = make(chan int, LIMIT)
	table.paper = make(chan int, LIMIT)
	table.grass = make(chan int, LIMIT)
	var signals [3]chan int
	// three smokers
	for i := 0; i < 3; i++ {
		signal := make(chan int, 1)
		signals[i] = signal
		go smoker(table, names[i], i, signal)
	}
	fmt.Printf("%s, %s, %s, sit with \n%s, %s, %s\n\n", names[0], names[1], names[2], smokeMap[0], smokeMap[1], smokeMap[2])
	arbitrate(table, signals)
}

 

输出:

Sandy, Apple, Daisy, sit with
paper, grass, match

Table chooses match: Daisy
Table: 1 grass: 1 match: 0
Table: 1 grass: 0 match: 0
Table: 0 grass: 0 match: 0
Daisy smokes a cigarette
Table chooses paper: Sandy
Table: 0 grass: 1 match: 1
Table: 0 grass: 1 match: 0
Table: 0 grass: 0 match: 0
Sandy smokes a cigarette
Table chooses match: Daisy
Table: 1 grass: 1 match: 0
Table: 1 grass: 0 match: 0
Table: 0 grass: 0 match: 0
Daisy smokes a cigarette
Go语言教程之边写边学:Goroutines和Channels练习:睡眠理发师问题

一个理发店有N张沙发和一张理发椅。没有顾客要理发时,理发师便去睡觉。当一个顾客走进理发店时,如果所有的沙发都已被占用,他便离开理发店;否则,如果理发师正在为其他顾客理发,则该顾客就找一张空沙发坐下等待;如果理发师因无顾客正在睡觉,则由新到的顾客唤醒理发师为其理发。在理发完成后,顾客必须付费,直到理发师收费后才能离开理发店。

示例代码:

package main

import (
	"fmt"
	"sync"
	"time"
)

const (
	sleeping = iota
	checking
	cutting
)

var stateLog = map[int]string{
	0: "Sleeping",
	1: "Checking",
	2: "Cutting",
}
var wg *sync.WaitGroup // Amount of potentional customers

type Barber struct {
	name string
	sync.Mutex
	state    int // Sleeping/Checking/Cutting
	customer *Customer
}

type Customer struct {
	name string
}

func (c *Customer) String() string {
	return fmt.Sprintf("%p", c)[7:]
}

func NewBarber() (b *Barber) {
	return &Barber{
		name:  "Sam",
		state: sleeping,
	}
}

// Barber goroutine
// Checks for customers
// Sleeps - wait for wakers to wake him up
func barber(b *Barber, wr chan *Customer, wakers chan *Customer) {
	for {
		b.Lock()
		defer b.Unlock()
		b.state = checking
		b.customer = nil

		// checking the waiting room
		fmt.Printf("Checking waiting room: %d\n", len(wr))
		time.Sleep(time.Millisecond * 100)
		select {
		case c := <-wr:
			HairCut(c, b)
			b.Unlock()
		default: // Waiting room is empty
			fmt.Printf("Sleeping Barber - %s\n", b.customer)
			b.state = sleeping
			b.customer = nil
			b.Unlock()
			c := <-wakers
			b.Lock()
			fmt.Printf("Woken by %s\n", c)
			HairCut(c, b)
			b.Unlock()
		}
	}
}

func HairCut(c *Customer, b *Barber) {
	b.state = cutting
	b.customer = c
	b.Unlock()
	fmt.Printf("Cutting  %s hair\n", c)
	time.Sleep(time.Millisecond * 100)
	b.Lock()
	wg.Done()
	b.customer = nil
}

// customer goroutine
// just fizzles out if it's full, otherwise the customer
// is passed along to the channel handling it's haircut etc
func customer(c *Customer, b *Barber, wr chan<- *Customer, wakers chan<- *Customer) {
	// arrive
	time.Sleep(time.Millisecond * 50)
	// Check on barber
	b.Lock()
	fmt.Printf("Customer %s checks %s barber | room: %d, w %d - customer: %s\n",
		c, stateLog[b.state], len(wr), len(wakers), b.customer)
	switch b.state {
	case sleeping:
		select {
		case wakers <- c:
		default:
			select {
			case wr <- c:
			default:
				wg.Done()
			}
		}
	case cutting:
		select {
		case wr <- c:
		default: // Full waiting room, leave shop
			wg.Done()
		}
	case checking:
		panic("Customer shouldn't check for the Barber when Barber is Checking the waiting room")
	}
	b.Unlock()
}

func main() {
	b := NewBarber()
	b.name = "Rocky"
	WaitingRoom := make(chan *Customer, 5) // 5 chairs
	Wakers := make(chan *Customer, 1)      // Only one waker at a time
	go barber(b, WaitingRoom, Wakers)

	time.Sleep(time.Millisecond * 100)
	wg = new(sync.WaitGroup)
	n := 10
	wg.Add(10)
	// Spawn customers
	for i := 0; i < n; i++ {
		time.Sleep(time.Millisecond * 50)
		c := new(Customer)
		go customer(c, b, WaitingRoom, Wakers)
	}

	wg.Wait()
	fmt.Println("No more customers for the day")
}

 

输出:

Checking waiting room: 0
Sleeping Barber - 
Customer 120 checks Sleeping barber | room: 0, w 0 - customer: 
Woken by 120
..............
..............
Checking waiting room: 0
No more customers for the day
Go语言教程之边写边学:Goroutines和Channels练习:生产者消费者问题

该问题描述了两个进程,即生产者和使用者,它们共享一个用作队列的公共固定大小的缓冲区。生产者的工作是生成数据,将其放入缓冲区,然后重新开始。同时,消费者正在消耗数据(即,将其从缓冲区中删除),一次一个片段。问题在于确保生产者不会尝试将数据添加到缓冲区中(如果数据已满),并且使用者不会尝试从空缓冲区中删除数据。生产者的解决方案是,如果缓冲区已满,要么进入睡眠状态,要么丢弃数据。下次使用者从缓冲区中删除项目时,它会通知生产者,生产者再次开始填充缓冲区。同样,如果使用者发现缓冲区为空,则可以进入睡眠状态。下次生产者将数据放入缓冲区时,它会唤醒休眠的消费者。

 

示例代码:

package main

import (
	"flag"
	"fmt"
	"log"
	"os"
	"runtime"
	"runtime/pprof"
)

type Consumer struct {
	msgs *chan int
}

// NewConsumer creates a Consumer
func NewConsumer(msgs *chan int) *Consumer {
	return &Consumer{msgs: msgs}
}

// consume reads the msgs channel
func (c *Consumer) consume() {
	fmt.Println("consume: Started")
	for {
		msg := <-*c.msgs
		fmt.Println("consume: Received:", msg)
	}
}

// Producer definition
type Producer struct {
	msgs *chan int
	done *chan bool
}

// NewProducer creates a Producer
func NewProducer(msgs *chan int, done *chan bool) *Producer {
	return &Producer{msgs: msgs, done: done}
}

// produce creates and sends the message through msgs channel
func (p *Producer) produce(max int) {
	fmt.Println("produce: Started")
	for i := 0; i < max; i++ {
		fmt.Println("produce: Sending ", i)
		*p.msgs <- i
	}
	*p.done <- true // signal when done
	fmt.Println("produce: Done")
}

func main() {
	// profile flags
	cpuprofile := flag.String("cpuprofile", "", "write cpu profile to `file`")
	memprofile := flag.String("memprofile", "", "write memory profile to `file`")

	// get the maximum number of messages from flags
	max := flag.Int("n", 5, "defines the number of messages")

	flag.Parse()

	// utilize the max num of cores available
	runtime.GOMAXPROCS(runtime.NumCPU())

	// CPU Profile
	if *cpuprofile != "" {
		f, err := os.Create(*cpuprofile)
		if err != nil {
			log.Fatal("could not create CPU profile: ", err)
		}
		if err := pprof.StartCPUProfile(f); err != nil {
			log.Fatal("could not start CPU profile: ", err)
		}
		defer pprof.StopCPUProfile()
	}

	var msgs = make(chan int)  // channel to send messages
	var done = make(chan bool) // channel to control when production is done

	// Start a goroutine for Produce.produce
	go NewProducer(&msgs, &done).produce(*max)

	// Start a goroutine for Consumer.consume
	go NewConsumer(&msgs).consume()

	// Finish the program when the production is done
	<-done

	// Memory Profile
	if *memprofile != "" {
		f, err := os.Create(*memprofile)
		if err != nil {
			log.Fatal("could not create memory profile: ", err)
		}
		runtime.GC() // get up-to-date statistics
		if err := pprof.WriteHeapProfile(f); err != nil {
			log.Fatal("could not write memory profile: ", err)
		}
		f.Close()
	}
}

 

输出:

consume: Started
produce: Started
produce: Sending  0
produce: Sending  1
consume: Received: 0
consume: Received: 1
produce: Sending  2
produce: Sending  3
consume: Received: 2
consume: Received: 3
produce: Sending  4
produce: Done
Go语言教程之边写边学:Goroutines和Channels练习:检查点同步问题

检查点同步是同步多个任务的问题。考虑一个车间,几个工人组装一些机制的细节。当他们每个人都完成他的工作时,他们会把细节放在一起。没有商店,所以先完成部分的工人必须等待其他人才能开始另一个。将细节放在一起是任务在路径分开之前自我同步的检查点。

 

示例代码:

package main

import (
	"log"
	"math/rand"
	"sync"
	"time"
)

func worker(part string) {
	log.Println(part, "worker begins part")
	time.Sleep(time.Duration(rand.Int63n(1e6)))
	log.Println(part, "worker completes part")
	wg.Done()
}

var (
	partList    = []string{"A", "B", "C", "D"}
	nAssemblies = 3
	wg          sync.WaitGroup
)

func main() {
	rand.Seed(time.Now().UnixNano())
	for c := 1; c <= nAssemblies; c++ {
		log.Println("begin assembly cycle", c)
		wg.Add(len(partList))
		for _, part := range partList {
			go worker(part)
		}
		wg.Wait()
		log.Println("assemble.  cycle", c, "complete")
	}
}

 

输出:

2019/07/15 16:10:32 begin assembly cycle 1
2019/07/15 16:10:32 D worker begins part
2019/07/15 16:10:32 A worker begins part
2019/07/15 16:10:32 B worker begins part
........
2019/07/15 16:10:32 D worker completes part
2019/07/15 16:10:32 C worker completes part
2019/07/15 16:10:32 assemble.  cycle 3 complete

 

Go语言教程之边写边学:Goroutines和Channels练习:哲学家就餐问题

 

哲学家就餐问题可以这样表述,假设有五位哲学家围坐在一张圆形餐桌旁,做以下两件事情之一:吃饭,或者思考。吃东西的时候,他们就停止思考,思考的时候也停止吃东西。餐桌中间有一大碗意大利面,每两个哲学家之间有一只餐叉。因为用一只餐叉很难吃到意大利面,所以假设哲学家必须用两只餐叉吃东西。他们只能使用自己左右手边的那两只餐叉。哲学家就餐问题有时也用米饭和筷子而不是意大利面和餐叉来描述,因为很明显,吃米饭必须用两根筷子。

哲学家从来不交谈,这就很危险,可能产生死锁,每个哲学家都拿着左手的餐叉,永远都在等右边的餐叉(或者相反)。即使没有死锁,也有可能发生资源耗尽。例如,假设规定当哲学家等待另一只餐叉超过五分钟后就放下自己手里的那一只餐叉,并且再等五分钟后进行下一次尝试。这个策略消除了死锁(系统总会进入到下一个状态),但仍然有可能发生"活锁"。如果五位哲学家在完全相同的时刻进入餐厅,并同时拿起左边的餐叉,那么这些哲学家就会等待五分钟,同时放下手中的餐叉,再等五分钟,又同时拿起这些餐叉。

在实际的计算机问题中,缺乏餐叉可以类比为缺乏共享资源。一种常用的计算机技术是资源加锁,用来保证在某个时刻,资源只能被一个程序或一段代码访问。当一个程序想要使用的资源已经被另一个程序锁定,它就等待资源解锁。当多个程序涉及到加锁的资源时,在某些情况下就有可能发生死锁。例如,某个程序需要访问两个文件,当两个这样的程序各锁了一个文件,那它们都在等待对方解锁另一个文件,而这永远不会发生。

 

 

 

示例代码:main.go

package main

import (
	"hash/fnv"
	"log"
	"math/rand"
	"os"
	"sync"
	"time"
)

// Number of philosophers is simply the length of this list.
var ph = []string{"Mark", "Russell", "Rocky", "Haris", "Root"}

const hunger = 3                // Number of times each philosopher eats
const think = time.Second / 100 // Mean think time
const eat = time.Second / 100   // Mean eat time

var fmt = log.New(os.Stdout, "", 0)

var dining sync.WaitGroup

func diningProblem(phName string, dominantHand, otherHand *sync.Mutex) {
	fmt.Println(phName, "Seated")
	h := fnv.New64a()
	h.Write([]byte(phName))
	rg := rand.New(rand.NewSource(int64(h.Sum64())))
	rSleep := func(t time.Duration) {
		time.Sleep(t/2 + time.Duration(rg.Int63n(int64(t))))
	}
	for h := hunger; h > 0; h-- {
		fmt.Println(phName, "Hungry")
		dominantHand.Lock() // pick up forks
		otherHand.Lock()
		fmt.Println(phName, "Eating")
		rSleep(eat)
		dominantHand.Unlock() // put down forks
		otherHand.Unlock()
		fmt.Println(phName, "Thinking")
		rSleep(think)
	}
	fmt.Println(phName, "Satisfied")
	dining.Done()
	fmt.Println(phName, "Left the table")
}

func main() {
	fmt.Println("Table empty")
	dining.Add(5)
	fork0 := &sync.Mutex{}
	forkLeft := fork0
	for i := 1; i < len(ph); i++ {
		forkRight := &sync.Mutex{}
		go diningProblem(ph[i], forkLeft, forkRight)
		forkLeft = forkRight
	}
	go diningProblem(ph[0], fork0, forkLeft)
	dining.Wait() // wait for philosphers to finish
	fmt.Println("Table empty")
}

 

输出:

Table empty
Mark seated
Mark Hungry
Mark Eating
..................
..................
Haris Thinking
Haris Satisfied
Haris Left the table
Table empty
Go语言教程之边写边学:Goroutines和Channels练习:查找奇数和偶数

下面的程序启动两个Goroutines。这两个Goroutine现在并发运行。我们创建了两个无缓冲的通道,并将它们传递给goroutines作为通道接收到的值的参数。

 

示例代码:main.go

package main

import (
	"fmt"
)

func main() {
	var intSlice = []int{91, 42, 23, 14, 15, 76, 87, 28, 19, 95}
	chOdd := make(chan int)
	chEven := make(chan int)

	go odd(chOdd)
	go even(chEven)

	for _, value := range intSlice {
		if value%2 != 0 {
			chOdd <- value
		} else {
			chEven <- value
		}
	}

}

func odd(ch <-chan int) {
	for v := range ch {
		fmt.Println("ODD :", v)
	}
}

func even(ch <-chan int) {
	for v := range ch {
		fmt.Println("EVEN:", v)
	}
}

 

输出:

ODD : 91
ODD : 23
EVEN: 42
EVEN: 14
EVEN: 76
ODD : 15
ODD : 87
ODD : 19
ODD : 95
Go语言教程之边写边学:Goroutines和Channels练习:Goroutines通道执行顺序

您在代码中调用go协程,但无法判断协程何时结束,值何时传递到缓冲通道。由于此代码是异步的,因此每当协程完成时,它都会将数据写入通道并在另一端读取。在上面的示例中,您只调用了两个go协程,因此行为是确定的,并且在大多数情况下会以某种方式生成相同的输出,但是当您增加go协程时,输出将不相同,并且顺序将不同,除非您使其同步。

 

示例代码:main.go

package main

import "fmt"

func sum(a []int, c chan int) {
	sum := 0
	for _, v := range a {
		sum += v
	}
	c <- sum
}

func main() {
	a := []int{17, 12, 18, 9, 24, 42, 64, 12, 68, 82, 57, 32, 9, 2, 12, 82, 52, 34, 92, 36}

	c := make(chan int)
	for i := 0; i < len(a); i = i + 5 {
		go sum(a[i:i+5], c)
	}
	output := make([]int, 5)
	for i := 0; i < 4; i++ {
		output[i] = <-c
	}
	close(c)

	fmt.Println(output)
}

 

输出:

[296 80 268 112 0]
Go语言教程之边写边学:Goroutines和Channels练习:将斐波那契数列读写到通道

main函数有两个无缓冲通道ch和quit。在斐波那契函数中,select语句会阻塞,直到其中一个情况准备就绪。

示例代码:main.go

package main

import (
	"fmt"
)

func fibonacci(ch chan int, quit chan bool) {
	x, y := 0, 1
	for {
		select {
		case ch <- x: // write to channel ch
			x, y = y, x+y
		case <-quit:
			fmt.Println("quit")
			return
		}
	}
}

func main() {
	ch := make(chan int)
	quit := make(chan bool)
	n := 10

	go func(n int) {
		for i := 0; i < n; i++ {
			fmt.Println(<-ch) // read from channel ch
		}
		quit <- false
	}(n)

	fibonacci(ch, quit)
}

 

输出:

0
1
1
2
3
5
8
13
21
34
Go语言教程之边写边学:Goroutines和Channels练习:从通道发送和接收值

主要功能有两个功能:generator和receiver。我们创建一个c int通道并从生成器函数返回它。在匿名goroutine中运行的for循环将值写入通道c。

 

示例代码:main.go

package main

import (
	"fmt"
)

func main() {
	c := generator()
	receiver(c)
}

func receiver(c <-chan int) {
	for v := range c {
		fmt.Println(v)
	}
}

func generator() <-chan int {
	c := make(chan int)

	go func() {
		for i := 0; i < 10; i++ {
			c <- i
		}
		close(c)
	}()

	return c
}

 

输出:

0
1
2
3
4
5
6
7
8
9
Go语言教程之边写边学:Goroutines和Channels练习:启动多个Goroutine,每个goroutine都向一个频道添加值

该程序从10个Goroutines开始。我们创建了一个ch字符串通道,并通过同时运行10个goroutines将数据写入该通道。箭头相对于通道的方向指定是发送还是接收数据。指向ch的箭头指定我们要写入通道ch。箭头从ch向外指向指定我们正在从通道ch读取。

 

示例代码:main.go

package main

import (
	"fmt"
	"strconv"
)

func main() {
	ch := make(chan string)

	for i := 0; i < 10; i++ {
		go func(i int) {
			for j := 0; j < 10; j++ {
				ch <- "Goroutine : " + strconv.Itoa(i)
			}
		}(i)
	}

	for k := 1; k <= 100; k++ {
		fmt.Println(k, <-ch)
	}
}

 

输出:

1 Goroutine : 9
2 Goroutine : 9
3 Goroutine : 2
4 Goroutine : 0
5 Goroutine : 1
6 Goroutine : 5
7 Goroutine : 3
8 Goroutine : 4
9 Goroutine : 7
10 Goroutine : 6
....................
Go语言教程之边写边学:协程 Goroutine

goroutine是Go编程语言中的轻量级执行线程。它类似于其他编程语言中的线程,但它由Go运行时而不是操作系统管理。Goroutines允许在程序中并发执行函数,并且它们被设计为高效且可扩展。

在Go中,程序从单个goroutine(执行 main 函数)开始。可以使用go关键字后跟函数调用来创建其他goroutine。这将启动一个新的goroutine,该goroutine与原始goroutine同时运行。

Goroutines是非常轻量级的,可以在一个程序中创建数千甚至数百万个goroutine,而不会产生明显的开销。这使得在 Go 中编写并发程序变得容易,这些程序利用了多个CPU 内核,并且可以同时执行许多任务。

由于goroutines由Go运行时管理,因此它们会自动调度,并且可以使用通道相互通信。这使得编写复杂的并发程序变得容易,而不必担心锁定和同步等低级细节。

Goroutines使用通道相互通信,这是Go语言的内置功能。通道为goroutines提供了一种相互发送和接收值的方法,它们用于同步对共享数据的访问。

Goroutines是Go语言的一个关键特性,它们广泛用于并发和并行程序的设计中。它们使编写既高效又易于推理的代码变得容易。

新的goroutine由go语句创建。

要将函数作为goroutine运行,请调用以go语句为前缀的函数。下面是示例代码块:

sum()     // 普通函数,主线程等待函数执行完
go sum()  // goroutine函数,立即继续执行其他方法

go关键字使函数调用立即返回,而函数开始作为goroutine在后台运行,程序的其余部分继续执行。每个Golang程序的main函数都是使用goroutine启动的,因此每个Golang程序至少运行一个 goroutine。

 

创建 Goroutines

在每次调用函数responseSize之前添加了go关键字。三个responseSize goroutines同时启动,三个调用http。获取也是同时进行的。程序不会等到一个响应返回后再发送下一个请求。因此,使用 goroutines 可以更快地打印三种响应大小。

package main

import (
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"time"
)

func responseSize(url string) {
	fmt.Println("Step1: ", url)
	response, err := http.Get(url)
	if err != nil {
		log.Fatal(err)
	}

	fmt.Println("Step2: ", url)
	defer response.Body.Close()

	fmt.Println("Step3: ", url)
	body, err := ioutil.ReadAll(response.Body)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Println("Step4: ", len(body))
}

func main() {
	go responseSize("https://www.golangprograms.com")
	go responseSize("https://coderwall.com")
	go responseSize("https://stackoverflow.com")
	time.Sleep(10 * time.Second)
}

输出

Step1:  https://www.golangprograms.com
Step1:  https://stackoverflow.com
Step1:  https://coderwall.com
Step2:  https://stackoverflow.com
Step3:  https://stackoverflow.com
Step4:  116749
Step2:  https://www.golangprograms.com
Step3:  https://www.golangprograms.com
Step4:  79551
Step2:  https://coderwall.com
Step3:  https://coderwall.com
Step4:  203842

我们添加了对时间的调用。在main函数中休眠,这可以防止goroutines在responseSize goroutines完成之前退出。睡眠会让程序睡10秒。

 

等待goroutines完成执行

同步sync包的WaitGroup类型用于等待程序完成从主函数启动的所有goroutine。它使用指定goroutines数量的计数器,Wait会阻止程序的执行,直到 aitGroup计数器为零。

Add方法用于向等待组添加计数器。

WaitGroup的Done方法是使用defer语句来递减WaitGroup计数器的。

WaitGroup类型的Wait方法等待程序完成所有goroutine。

Wait方法在main函数内部调用,该方法阻止执行,直到WaitGroup计数器达到零值并确保执行所有goroutine。

package main

import (
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"sync"
)

// 用于等待所有goroutine执行完
var wg sync.WaitGroup

func responseSize(url string) {
	// 用于通知waitGroup当前goroutine执行完毕
	defer wg.Done()

	fmt.Println("Step1: ", url)
	response, err := http.Get(url)
	if err != nil {
		log.Fatal(err)
	}

	fmt.Println("Step2: ", url)
	defer response.Body.Close()

	fmt.Println("Step3: ", url)
	body, err := ioutil.ReadAll(response.Body)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Println("Step4: ", len(body))
}

func main() {
	// 告诉wg有3个goroutine需要等待
	wg.Add(3)
	fmt.Println("Start Goroutines")

	go responseSize("https://www.golangprograms.com")
	go responseSize("https://stackoverflow.com")
	go responseSize("https://coderwall.com")

	// 主线程等待所有goroutine执行完毕
	wg.Wait()
	fmt.Println("Terminating Program")
}

输出

Start Goroutines
Step1:  https://coderwall.com
Step1:  https://www.golangprograms.com
Step1:  https://stackoverflow.com
Step2:  https://stackoverflow.com
Step3:  https://stackoverflow.com
Step4:  116749
Step2:  https://www.golangprograms.com
Step3:  https://www.golangprograms.com
Step4:  79801
Step2:  https://coderwall.com
Step3:  https://coderwall.com
Step4:  203842
Terminating Program

 

从 Goroutines 获取值

从goroutine获取值的最自然方法是通道。通道是连接并发goroutines的管道。您可以将值从一个goroutine发送到通道中,并将这些值接收到另一个goroutine或同步函数中。

package main

import (
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"sync"
)

var wg sync.WaitGroup

func responseSize(url string, nums chan int) {
	defer wg.Done()

	response, err := http.Get(url)
	if err != nil {
		log.Fatal(err)
	}
	defer response.Body.Close()
	body, err := ioutil.ReadAll(response.Body)
	if err != nil {
		log.Fatal(err)
	}

	nums <- len(body)
}

func main() {
	nums := make(chan int) // 声明一个无缓冲通道
	wg.Add(1)
	go responseSize("https://www.golangprograms.com", nums)
	fmt.Println(<-nums) // 冲通道中读取值
	wg.Wait()
	close(nums) // 关闭通道
}

输出

79655

 

控制Goroutine的程序

使用通道,我们可以执行和暂停goroutine的流程。通道通过充当goroutines之间的管道来处理此通信。

package main

import (
	"fmt"
	"sync"
	"time"
)

var i int

func work() {
	time.Sleep(250 * time.Millisecond)
	i++
	fmt.Println(i)
}

func routine(command <-chan string, wg *sync.WaitGroup) {
	defer wg.Done()
	var status = "Play"
	for {
		select {
		case cmd := <-command:
			fmt.Println(cmd)
			switch cmd {
			case "Stop":
				return
			case "Pause":
				status = "Pause"
			default:
				status = "Play"
			}
		default:
			if status == "Play" {
				work()
			}
		}
	}
}

func main() {
	var wg sync.WaitGroup
	wg.Add(1)
	command := make(chan string)
	go routine(command, &wg)

	time.Sleep(1 * time.Second)
	command <- "Pause"

	time.Sleep(1 * time.Second)
	command <- "Play"

	time.Sleep(1 * time.Second)
	command <- "Stop"

	wg.Wait()
}

输出

1
2
3
4
Pause
Play
5
6
7
8
9
Stop

 

使用原子函数修复竞争条件

由于对共享资源的访问不同步并尝试同时读取和写入该资源,因此会出现竞争条件。

原子函数提供低级锁定机制,用于同步对整数和指针的访问。原子函数通常用于修复竞争条件。

sync包下的atomic中的函数通过锁定对共享资源的访问来支持同步goroutine。

package main

import (
	"fmt"
	"runtime"
	"sync"
	"sync/atomic"
)

var (
	counter int32          
	wg      sync.WaitGroup
)

func main() {
	wg.Add(3)

	go increment("Python")
	go increment("Java")
	go increment("Golang")

	wg.Wait()
	fmt.Println("Counter:", counter)

}

func increment(name string) {
	defer wg.Done()

	for range name {
		atomic.AddInt32(&counter, 1)
		runtime.Gosched() // 让出当前goroutine执行权
	}
}

原子包中的AddInt32函数通过强制一次只有一个goroutine可以执行和完成此添加操作来同步整数值的添加。当goroutines尝试调用任何原子函数时,它们会自动与引用的变量同步。

请注意,如果将代码行atomic.AddInt32(&counter, 1)替换为counter++,然后您将看到以下输出:

C:\Golang\goroutines>go run -race main.go
==================
WARNING: DATA RACE
Read at 0x0000006072b0 by goroutine 7:
  main.increment()
      C:/Golang/goroutines/main.go:31 +0x76

Previous write at 0x0000006072b0 by goroutine 8:
  main.increment()
      C:/Golang/goroutines/main.go:31 +0x90

Goroutine 7 (running) created at:
  main.main()
      C:/Golang/goroutines/main.go:18 +0x7e

Goroutine 8 (running) created at:
  main.main()
      C:/Golang/goroutines/main.go:19 +0x96
==================
Counter: 15
Found 1 data race(s)
exit status 66

C:\Golang\goroutines>

输出

C:\Golang\goroutines>go run -race main.go
Counter: 15


 

使用Mutex定义关键部分

互斥锁用于围绕代码创建一个关键部分,以确保一次只有一个goroutine可以执行该代码部分。

package main

import (
	"fmt"
	"sync"
)

var (
	counter int32          
	wg      sync.WaitGroup 
	mutex   sync.Mutex     // 用于定义需要保护的代码块,指该部分代码块保证每次只有一个线程访问。
)

func main() {
	wg.Add(3)

	go increment("Python")
	go increment("Go Programming Language")
	go increment("Java")

	wg.Wait()
	fmt.Println("Counter:", counter)

}

func increment(lang string) {
	defer wg.Done()

	for i := 0; i < 3; i++ {
		mutex.Lock()
		{
			fmt.Println(lang)
			counter++
		}
		mutex.Unlock()
	}
}

输出

C:\Golang\goroutines>go run -race main.go
Python
Python
Python
Java
Java
Java
Go Programming Language
Go Programming Language
Go Programming Language
Counter: 9

C:\Golang\goroutines>

由对Lock()Unlock()的调用定义的关键部分可针对计数器变量有保护操作。

Goroutine的中文名称是什么?

Goroutine的中文名称是协程

Goroutine是Go语言中的轻量级线程,由Go语言的运行时环境(runtime)管理。与操作系统线程相比,Goroutine更加轻量级,可以同时开启大量的Goroutine,且切换代价很小,能轻松利用硬件并发性。

  • 当前日期:
  • 北京时间:
  • 时间戳:
  • 今年的第:18周
  • 我的 IP:3.149.10.88
农历
五行
冲煞
彭祖
方位
吉神
凶神
极简任务管理 help
+ 0 0 0
Task Idea Collect