Go语言教程之边写边学:了解go中并发工作原理:挑战:利用并发方法更快地计算斐波纳契数
对于这个挑战,你需要通过提高现有程序的运行速度来改进它。 尝试自己编写程序,即使你不得不回头查看你以前用于练习的示例也要尝试。 然后,将你的解决方案与下一单元中的解决方案进行比较。
Go中的并发是一个复杂的问题,在实践中你会更好地理解它。 这一挑战只是你可以用来实践的一个建议。
祝好运!

利用并发方法更快地计算斐波纳契数

使用以下程序按顺序计算斐波纳契数:
package main

import (
    "fmt"
    "math/rand"
    "time"
)

func fib(number float64) float64 {
    x, y := 1.0, 1.0
    for i := 0; i < int(number); i++ {
        x, y = y, x+y
    }

    r := rand.Intn(3)
    time.Sleep(time.Duration(r) * time.Second)

    return x
}

func main() {
    start := time.Now()

    for i := 1; i < 15; i++ {
        n := fib(float64(i))
    fmt.Printf("Fib(%v): %v\n", i, n)
    }

    elapsed := time.Since(start)
    fmt.Printf("Done! It took %v seconds!\n", elapsed.Seconds())
}
你需要根据现有代码构建两个程序:
实现并发的改进版本。 完成此操作需要几秒钟的时间(不超过15秒),就像现在这样。 应使用有缓冲channel。
编写一个新版本以计算斐波纳契数,直到用户使用fmt.Scanf() 函数在终端中输入quit。 如果用户按Enter,则应计算新的斐波纳契数。 换句话说,你将不再有从1到10的循环。
使用两个无缓冲channel:一个用于计算斐波纳契数,另一个用于等待用户的"退出"消息。 你需要使用select语句。
下面是与程序进行交互的示例:
1

1

2

3

5

8

13
quit
Done calculating Fibonacci!
Done! It took 12.043196415 seconds!

实现并发并使程序的运行速度更快的改进版本如下所示:

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func fib(number float64, ch chan string) {
    x, y := 1.0, 1.0
    for i := 0; i < int(number); i++ {
        x, y = y, x+y
    }

    r := rand.Intn(3)
    time.Sleep(time.Duration(r) * time.Second)

    ch <- fmt.Sprintf("Fib(%v): %v\n", number, x)
}

func main() {
    start := time.Now()

    size := 15
    ch := make(chan string, size)

    for i := 0; i < size; i++ {
        go fib(float64(i), ch)
    }

    for i := 0; i < size; i++ {
        fmt.Printf(<-ch)
    }

    elapsed := time.Since(start)
    fmt.Printf("Done! It took %v seconds!\n", elapsed.Seconds())
}

输出:

Fib(14): 610
Fib(8): 34
Fib(1): 1
Fib(5): 8
Fib(0): 1
Fib(12): 233
Fib(2): 2
Fib(13): 377
Fib(6): 13
Fib(7): 21
Fib(4): 5
Fib(3): 3
Fib(10): 89
Fib(9): 55
Fib(11): 144
Done! It took 2 seconds!

使用两个无缓冲channel的程序的第二个版本如下所示:

package main

import (
    "fmt"
    "time"
)

var quit = make(chan bool)

func fib(c chan int) {
    x, y := 1, 1

    for {
        select {
            case c <- x:
                x, y = y, x+y
            case <-quit:
                fmt.Println("Done calculating Fibonacci!")
            return
        }
    }
}

func main() {
    start := time.Now()

    command := ""
    data := make(chan int)

    go fib(data)

    for {
        num := <-data
        fmt.Println(num)
        fmt.Scanf("%s", &command)
        if num > 10000 {
            quit <- true
            break
        }
    }

    time.Sleep(1 * time.Second)

    elapsed := time.Since(start)
    fmt.Printf("Done! It took %v seconds!\n", elapsed.Seconds())
}

输出:

1
1
2
3
5
8
13
21
34
55
89
144
233
377
610
987
1597
2584
4181
6765
10946
Done calculating Fibonacci!
Done! It took 1 seconds!
Go语言教程之边写边学:了解go中并发工作原理:了解有缓冲channel
正如你所了解的,默认情况下channel是无缓冲行为。 这意味着只有存在接收操作时,它们才接受发送操作。 否则,程序将永久被阻止等待。
有时需要在goroutine之间进行此类同步。 但是,有时你可能只需要实现并发,而不需要限制goroutine之间的通信方式。
有缓冲channel在不阻止程序的情况下发送和接收数据,因为有缓冲channel的行为类似于队列。 创建channel时,可以限制此队列的大小,如下所示:
ch := make(chan string, 10)
每次向channel发送数据时,都会将元素添加到队列中。 然后,接收操作将从队列中删除该元素。 当channel已满时,任何发送操作都将等待,直到有空间保存数据。 相反,如果channel是空的且存在读取操作,程序则会被阻止,直到有数据要读取。
下面是一个理解有缓冲channel的简单示例:
package main

import (
    "fmt"
)

func send(ch chan string, message string) {
    ch <- message
}

func main() {
    size := 4
    ch := make(chan string, size)
    send(ch, "one")
    send(ch, "two")
    send(ch, "three")
    send(ch, "four")
    fmt.Println("All data sent to the channel ...")

    for i := 0; i < size; i++ {
        fmt.Println(<-ch)
    }

    fmt.Println("Done!")
}

运行程序时,将看到以下输出:

All data sent to the channel ...
one
two
three
four
Done!

你可能会说我们在这里没有做任何不同的操作,你是对的。 但是让我们看看当你将size变量更改为一个更小的数字(你甚至可以尝试使用一个更大的数字)时会发生什么情况,如下所示:

size := 2

重新运行程序时,将看到以下错误:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.send(...)
        /Users/developer/go/src/concurrency/main.go:8
main.main()
        /Users/developer/go/src/concurrency/main.go:16 +0xf3
exit status 2
出现此错误是因为对send函数的调用是连续的。 你不是在创建新的goroutine。 因此,没有任何要排队的操作。
channel与goroutine有着紧密的联系。 如果没有另一个goroutine从channel接收数据,则整个程序可能会永久处于被阻止状态。 正如你所见,这种情况确实会发生。
现在让我们进行一些有趣的实践! 我们将为最后两次调用创建goroutine (前两次调用正确适应缓冲区),并运行for循环四次。 代码如下:
func main() {
    size := 2
    ch := make(chan string, size)
    send(ch, "one")
    send(ch, "two")
    go send(ch, "three")
    go send(ch, "four")
    fmt.Println("All data sent to the channel ...")

    for i := 0; i < 4; i++ {
        fmt.Println(<-ch)
    }

    fmt.Println("Done!")
}
运行程序时,它按预期工作。 我们建议在使用channel时始终使用goroutine。
让我们测试一下创建的缓冲通道的元素超出所需量的情况。 我们将使用之前用于检查API的示例,并创建大小为10的缓冲通道:
package main

import (
    "fmt"
    "net/http"
    "time"
)

func main() {
    start := time.Now()

    apis := []string{
        "https://management.azure.com",
        "https://dev.azure.com",
        "https://api.github.com",
        "https://outlook.office.com/",
        "https://api.somewhereintheinternet.com/",
        "https://graph.microsoft.com",
    }

    ch := make(chan string, 10)

    for _, api := range apis {
        go checkAPI(api, ch)
    }

    for i := 0; i < len(apis); i++ {
        fmt.Print(<-ch)
    }

    elapsed := time.Since(start)
    fmt.Printf("Done! It took %v seconds!\n", elapsed.Seconds())
}

func checkAPI(api string, ch chan string) {
    _, err := http.Get(api)
    if err != nil {
        ch <- fmt.Sprintf("ERROR: %s is down!\n", api)
        return
    }

    ch <- fmt.Sprintf("SUCCESS: %s is up and running!\n", api)
}
运行程序时,将看到与以前相同的输出。 可以更改channel的大小,用更小或更大的数字进行测试,程序仍能正常运行。

无缓冲channel与有缓冲channel

现在,你可能想知道何时使用这两种类型。 这完全取决于你希望goroutine之间的通信如何进行。 无缓冲channel同步通信。 它们保证每次发送数据时,程序都会被阻止,直到有人从channel中读取数据。
相反,有缓冲channel将发送和接收操作解耦。 它们不会阻止程序,但你必须小心使用,因为可能最终会导致死锁(如前文所述)。 使用无缓冲channel时,可以控制可并发运行的goroutine的数量。 例如,你可能要对API进行调用,并且想要控制每秒执行的调用次数。 否则,你可能会被阻止。

Channel方向

Go中的通道具有另一个有趣的功能。 在使用通道作为函数的参数时,可以指定通道是要"发送"数据还是"接收"数据。 随着程序的增长,可能会使用大量的函数,这时候,最好记录每个channel的意图,以便正确使用它们。 或者,你要编写一个库,并希望将channel公开为只读,以保持数据一致性。
要定义channel的方向,可以使用与读取或接收数据时类似的方式进行定义。 但是你在函数参数中声明channel时执行此操作。 将通道类型定义为函数中的参数的语法如下所示:
chan<- int // 仅用于写入数据的chan
<-chan int // 仅用于读取数据的chan
通过仅接收的channel发送数据时,在编译程序时会出现错误。
让我们使用以下程序作为两个函数的示例,一个函数用于读取数据,另一个函数用于发送数据:
package main

import "fmt"

func send(ch chan<- string, message string) {
    fmt.Printf("Sending: %#v\n", message)
    ch <- message
}

func read(ch <-chan string) {
    fmt.Printf("Receiving: %#v\n", <-ch)
}

func main() {
    ch := make(chan string, 1)
    send(ch, "Hello World!")
    read(ch)
}

运行程序时,将看到以下输出:

Sending: "Hello World!"
Receiving: "Hello World!"

程序阐明每个函数中每个channel的意图。 如果试图使用一个channel在一个仅用于接收数据的channel中发送数据,将会出现编译错误。 例如,尝试执行如下所示的操作:

func read(ch <-chan string) {
    fmt.Printf("Receiving: %#v\n", <-ch)
    ch <- "Bye!"
}

运行程序时,将看到以下错误:

# command-line-arguments
./main.go:12:5: invalid operation: ch <- "Bye!" (send to receive-only type <-chan string)

编译错误总比误用channel好。

多路复用

最后,让我们讨论如何使用select关键字与多个通道同时交互。 有时,在使用多个channel时,需要等待事件发生。 例如,当程序正在处理的数据中出现异常时,可以包含一些逻辑来取消操作。
select语句的工作方式类似于switch语句,但它适用于channel。 它会阻止程序的执行,直到它收到要处理的事件。 如果它收到多个事件,则会随机选择一个。
select语句的一个重要方面是,它在处理事件后完成执行。 如果要等待更多事件发生,则可能需要使用循环。
让我们使用以下程序来看看select的运行情况:
package main

import (
    "fmt"
    "time"
)

func process(ch chan string) {
    time.Sleep(3 * time.Second)
    ch <- "Done processing!"
}

func replicate(ch chan string) {
    time.Sleep(1 * time.Second)
    ch <- "Done replicating!"
}

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)
    go process(ch1)
    go replicate(ch2)

    for i := 0; i < 2; i++ {
        select {
        case process := <-ch1:
            fmt.Println(process)
        case replicate := <-ch2:
            fmt.Println(replicate)
        }
    }
}

运行程序时,将看到以下输出:

Done replicating!
Done processing!

请注意,replicate函数首先完成,这就是首先在终端中看到其输出的原因。 main函数存在一个循环,因为select语句在收到事件后立即结束,但我们仍在等待process函数完成。

Go语言教程之边写边学:了解go中并发工作原理:将channel用作通信机制
Go中的channel是goroutine之间的通信机制。 请记住Go的并发方法是:"不是通过共享内存通信;而是通过通信共享内存。"当你需要将值从一个goroutine发送到另一个时,可以使用通道。 让我们看看它们的工作原理,以及如何开始使用它们来编写并发Go程序。

Channel语法

由于channel是发送和接收数据的通信机制,因此它也有类型之分。 这意味着你只能发送channel支持的数据类型。 除使用关键字chan作为channel的数据类型外,还需指定将通过channel传递的数据类型,如int类型。
每次声明一个channel或希望在函数中指定一个channel作为参数时,都需要使用chan <type>,如chan int。 若要创建通道,需使用内置的make() 函数:
ch := make(chan int)
一个channel可以执行两项操作:发送数据和接收数据。 若要指定channel具有的操作类型,需要使用channel运算符 <-。 此外,在channel中发送数据和接收数据属于阻止操作。 你一会儿就会明白为何如此。
如果希望通道仅发送数据,请在通道之后使用 <- 运算符。 如果希望通道接收数据,请在通道之前使用 <- 运算符,如下所示:
ch <- x // 向ch写入数据x = <-ch // 从ch读取数据
<-ch // 从ch读取数据但不做任何处理

可在channel中执行的另一项操作是关闭channel。 若要关闭通道,使用内置的close() 函数:

close(ch)
当你关闭通道时,你希望数据将不再在该通道中发送。 如果试图将数据发送到已关闭的channel,则程序将发生严重错误。 如果试图通过已关闭的channel接收数据,则可以读取发送的所有数据。 随后的每次"读取"都将返回一个零值。
让我们回到之前创建的程序,然后使用通道来删除睡眠功能。 首先,让我们在main函数中创建一个字符串channel,如下所示:
ch := make(chan string)
接下来,删除睡眠行time.Sleep(3 * time.Second)。
现在,我们可以使用channel在goroutine之间进行通信。 应重构代码并通过通道发送该消息,而不是在checkAPI函数中打印结果。 要使用该函数中的channel,需要添加channel作为参数。 checkAPI函数应如下所示:
func checkAPI(api string, ch chan string) {
    _, err := http.Get(api)
    if err != nil {
        ch <- fmt.Sprintf("ERROR: %s is down!\n", api)
        return
    }

    ch <- fmt.Sprintf("SUCCESS: %s is up and running!\n", api)
}
请注意,我们必须使用fmt.Sprintf函数,因为我们不想打印任何文本,只需利用通道发送格式化文本。 另请注意,我们在channel变量之后使用 <- 运算符来发送数据。
现在,你需要更改main函数以发送channel变量并接收要打印的数据,如下所示:
ch := make(chan string)

for _, api := range apis {
    go checkAPI(api, ch)
}

fmt.Print(<-ch)
请注意,我们在channel之前使用 <- 运算符来表明我们想要从channel读取数据。
重新运行程序时,会看到如下所示的输出:
ERROR: https://api.somewhereintheinternet.com/ is down!

Done! It took 0.007401217 seconds!
至少它不用调用睡眠函数就可以工作,对吧? 但它仍然没有达到我们的目的。 我们只看到其中一个goroutine的输出,而我们共创建了五个goroutine。 在下一节中,我们来看看这个程序为什么是这样工作的。

无缓冲channel

使用make() 函数创建channel时,会创建一个无缓冲channel,这是默认行为。 无缓冲channel会阻止发送操作,直到有人准备好接收数据。 正如我们之前所说,发送和接收都属于阻止操作。 此阻止操作也是上一节中的程序在收到第一条消息后立即停止的原因。
我们可以说fmt.Print(<-ch) 会阻止程序,因为它从channel读取,并等待一些数据到达。 一旦有任何数据到达,它就会继续下一行,然后程序完成。
其他goroutine发生了什么? 它们仍在运行,但都没有在侦听。 而且,由于程序提前完成,一些goroutine无法发送数据。 为了证明这一点,让我们添加另一个fmt.Print(<-ch),如下所示:
ch := make(chan string)

for _, api := range apis {
    go checkAPI(api, ch)
}

fmt.Print(<-ch)
fmt.Print(<-ch)

重新运行程序时,会看到如下所示的输出:

ERROR: https://api.somewhereintheinternet.com/ is down!
SUCCESS: https://api.github.com is up and running!
Done! It took 0.263611711 seconds!

请注意,现在你会看到两个API的输出。 如果继续添加更多fmt.Print(<-ch) 行,你最终将会读取发送到channel的所有数据。 但是如果你试图读取更多数据,而没有goroutine再发送数据,会发生什么呢? 例如:

ch := make(chan string)

for _, api := range apis {
    go checkAPI(api, ch)
}

fmt.Print(<-ch)
fmt.Print(<-ch)
fmt.Print(<-ch)
fmt.Print(<-ch)
fmt.Print(<-ch)
fmt.Print(<-ch)

fmt.Print(<-ch)

重新运行程序时,会看到如下所示的输出:

ERROR: https://api.somewhereintheinternet.com/ is down!
SUCCESS: https://api.github.com is up and running!
SUCCESS: https://management.azure.com is up and running!
SUCCESS: https://graph.microsoft.com is up and running!
SUCCESS: https://outlook.office.com/ is up and running!
SUCCESS: https://dev.azure.com is up and running!
它在运行,但程序未完成。 最后一个打印行阻止了程序,因为它需要接收数据。 必须使用类似Ctrl+C的命令关闭程序。
上个示例只是证明了读取数据和接收数据都属于阻止操作。 要解决此问题,可以将代码更改为for循环,并只接收确定要发送的数据,如下所示:
for i := 0; i < len(apis); i++ {
    fmt.Print(<-ch)
}

以下是程序的最终版本,以防你的版本出错:

package main

import (
    "fmt"
    "net/http"
    "time"
)

func main() {
    start := time.Now()

    apis := []string{
        "https://management.azure.com",
        "https://dev.azure.com",
        "https://api.github.com",
        "https://outlook.office.com/",
        "https://api.somewhereintheinternet.com/",
        "https://graph.microsoft.com",
    }

    ch := make(chan string)

    for _, api := range apis {
        go checkAPI(api, ch)
    }

    for i := 0; i < len(apis); i++ {
        fmt.Print(<-ch)
    }

    elapsed := time.Since(start)
    fmt.Printf("Done! It took %v seconds!\n", elapsed.Seconds())
}

func checkAPI(api string, ch chan string) {
    _, err := http.Get(api)
    if err != nil {
        ch <- fmt.Sprintf("ERROR: %s is down!\n", api)
        return
    }

    ch <- fmt.Sprintf("SUCCESS: %s is up and running!\n", api)
}

重新运行程序时,会看到如下所示的输出:

ERROR: https://api.somewhereintheinternet.com/ is down!
SUCCESS: https://api.github.com is up and running!
SUCCESS: https://management.azure.com is up and running!
SUCCESS: https://dev.azure.com is up and running!
SUCCESS: https://graph.microsoft.com is up and running!
SUCCESS: https://outlook.office.com/ is up and running!
Done! It took 0.602099714 seconds!
程序正在执行应执行的操作。 你不再使用睡眠函数,而是使用通道。 另请注意,在不使用并发时,现在需要约600毫秒完成,而不会耗费近2秒。
最后,我们可以说,无缓冲channel在同步发送和接收操作。 即使使用并发,通信也是同步的。
Go语言教程之边写边学:了解go中并发工作原理:了解goroutine
并发是独立活动的组合,就像Web服务器虽然同时处理多个用户请求,但它是自主运行的。 并发在当今的许多程序中都存在。 Web服务器就是一个例子,但你也能看到,在批量处理大量数据时也需要使用并发。
Go有两种编写并发程序的样式。 一种是在其他语言中通过线程实现的传统样式。 在本模块中,你将了解Go的样式,其中值是在称为goroutine的独立活动之间传递的,以与进程进行通信。
如果这是你第一次学习并发,我们建议你多花一些时间来查看我们将要编写的每一段代码,以进行实践。

Go实现并发的方法

通常,编写并发程序时最大的问题是在进程之间共享数据。 Go采用不同于其他编程语言的通信方式,因为Go是通过channel来回传递数据的。 此方法意味着只有一个活动 (goroutine) 有权访问数据,设计上不存在争用条件。 学完本模块中的goroutine和channel之后,你将更好地理解Go的并发方法。
Go的方法可以总结为以下口号:"不是通过共享内存通信,而是通过通信共享内存。"我们将在以下部分介绍此方法,但你也可以在Go博客文章通过通信共享内存中了解详细信息。
如前所述,Go还提供低级别的并发基元。 但在本模块中,我们只介绍Go的惯用并发方法。
让我们从探索goroutine开始。

Goroutine

goroutine是轻量线程中的并发活动,而不是在操作系统中进行的传统活动。 假设你有一个写入输出的程序和另一个计算两个数字相加的函数。 一个并发程序可以有数个goroutine同时调用这两个函数。
我们可以说,程序执行的第一个goroutine是main() 函数。 如果要创建其他goroutine,则必须在调用该函数之前使用go关键字,如下所示:
func main(){
    login()
    go launch()
}

你还会发现,许多程序喜欢使用匿名函数来创建goroutine,如此代码中所示:

func main(){
    login()
    go func() {
        launch()
    }()
}

为了查看运行中的goroutine,让我们编写一个并发程序。

编写并发程序

由于我们只想将重点放在并发部分,因此我们使用现有程序来检查API终结点是否响应。 代码如下:
package main

import (
    "fmt"
    "net/http"
    "time"
)

func main() {
    start := time.Now()

    apis := []string{
        "https://management.azure.com",
        "https://dev.azure.com",
        "https://api.github.com",
        "https://outlook.office.com/",
        "https://api.somewhereintheinternet.com/",
        "https://graph.microsoft.com",
    }

    for _, api := range apis {
        _, err := http.Get(api)
        if err != nil {
            fmt.Printf("ERROR: %s is down!\n", api)
            continue
        }

        fmt.Printf("SUCCESS: %s is up and running!\n", api)
    }

    elapsed := time.Since(start)
    fmt.Printf("Done! It took %v seconds!\n", elapsed.Seconds())
}

运行前面的代码时,将看到以下输出:

SUCCESS: https://management.azure.com is up and running!
SUCCESS: https://dev.azure.com is up and running!
SUCCESS: https://api.github.com is up and running!
SUCCESS: https://outlook.office.com/ is up and running!
ERROR: https://api.somewhereintheinternet.com/ is down!
SUCCESS: https://graph.microsoft.com is up and running!
Done! It took 1.658436834 seconds!
这里没有什么特别之处,但我们可以做得更好。 或许我们可以同时检查所有站点? 此程序可以在500毫秒的时间内完成,不需要耗费将近两秒。
请注意,我们需要并发运行的代码部分是向站点进行HTTP调用的部分。 换句话说,我们需要为程序要检查的每个API创建一个goroutine。
为了创建goroutine,我们需要在调用函数前使用go关键字。 但我们在这里没有函数。 让我们重构该代码并创建一个新函数,如下所示:
func checkAPI(api string) {
    _, err := http.Get(api)
    if err != nil {
        fmt.Printf("ERROR: %s is down!\n", api)
        return
    }

    fmt.Printf("SUCCESS: %s is up and running!\n", api)
}

注意,我们不再需要continue关键字,因为我们不在for循环中。 要停止函数的执行流,只需使用return关键字。 现在,我们需要修改main() 函数中的代码,为每个API创建一个goroutine,如下所示:

for _, api := range apis {
    go checkAPI(api)
}
重新运行程序,看看发生了什么。
看起来程序不再检查API了,对吗? 显示的内容可能与以下输出类似:
Done! It took 1.506e-05 seconds!
速度可真快! 发生了什么情况? 你会看到最后一条消息,指出程序已完成,因为Go为循环中的每个站点创建了一个goroutine,并且它立即转到下一行。
即使看起来checkAPI函数没有运行,它实际上是在运行。 它只是没有时间完成。 请注意,如果在循环之后添加一个睡眠计时器会发生什么,如下所示:
for _, api := range apis {
    go checkAPI(api)
}

time.Sleep(3 * time.Second)
现在,重新运行程序时,可能会看到如下所示的输出:
ERROR: https://api.somewhereintheinternet.com/ is down!
SUCCESS: https://api.github.com is up and running!
SUCCESS: https://management.azure.com is up and running!
SUCCESS: https://dev.azure.com is up and running!
SUCCESS: https://outlook.office.com/ is up and running!
SUCCESS: https://graph.microsoft.com is up and running!
Done! It took 3.002114573 seconds!
看起来似乎起作用了,对吧? 不完全如此。 如果你想在列表中添加一个新站点呢? 也许三秒钟是不够的。 你怎么知道? 你无法管理。 必须有更好的方法,这就是我们在下一节讨论channel时要涉及的内容。
Go语言教程之边写边学:基础练习:并发最佳实践

Go中的并发性是程序同时执行多个任务的能力。Go通过goroutines提供对并发的内置支持,goroutines是由Go运行时管理的轻量级线程。Goroutines允许您并发执行任务,而无需直接管理线程。Go中的并发性被设计为高效且可扩展,使其非常适合构建高性能和分布式系统。凭借对并发的支持,Go使开发人员能够编写能够充分利用现代硬件并更有效地利用系统资源的程序。

 

并发性是Go的主要特性之一。以下是在Go中使用并发的一些最佳实践:

  • 使用goroutines代替线程:Goroutines是由Go运行时管理的轻量级线程,这使得它们比传统线程更高效。使用它们可以并发执行任务并提高性能。
  • 使用通道在goroutines之间进行通信:通道是Go中goroutines之间通信的主要方式。使用它们在并发操作之间安全地传递数据。
  • 避免在goroutine之间共享可变数据:为了避免竞争条件和其他同步问题,请尽量避免在goroutines之间共享可变数据。相反,请根据需要使用通道传递数据副本。
  • 使用同步包进行同步:同步包提供同步基元,如互斥锁和等待组,可用于协调对共享资源的访问。
  • 使用select语句协调通道操作:select语句允许您等待多个通道操作一次完成,使其成为协调并发操作的强大工具。
  • 使用上下文包管理长时间运行的操作:上下文包提供了一种管理长时间运行的操作并在必要时取消它们的方法。使用它可避免无限期地阻塞通道或其他操作。

通过遵循这些最佳实践,您可以编写高效、安全且易于维护的并发代码。

Go语言教程之边写边学:并发 Concurrency

并发是程序同时执行多项操作的能力。这意味着具有两个或多个任务的程序,这些任务几乎同时单独运行,但仍然是同一程序的一部分。并发性在现代软件中非常重要,因为需要在不干扰程序整体流程的情况下尽可能快地执行独立的代码片段。

Golang中的并发性是函数彼此独立运行的能力。goroutine是一个能够与其他函数同时运行的函数。将函数创建为goroutine时,它被视为一个独立的工作单元,该工作单元在可用的逻辑处理器上调度和执行。Golang运行时调度器具有管理所有创建并需要处理器时间的goroutines的功能。调度程序将操作系统的线程绑定到逻辑处理器,以便执行goroutine。通过位于操作系统之上,调度程序可以控制与在任何给定时间在哪些逻辑处理器上运行哪些goroutines相关的所有内容。

流行的编程语言(如Java和Python)通过使用线程实现并发性。Golang具有内置的并发结构:goroutines和channels。Golang中的并发既便宜又简单。Goroutines是廉价、轻量级的线程。通道是允许 goroutines之间进行通信的管道。

通信顺序进程(简称 CSP)用于描述具有多个并发模型的系统应如何相互交互。它通常严重依赖使用通道作为在两个或多个并发进程之间传递消息的媒介,并且是Golang的基本口头禅。

Goroutines—goroutines是一个独立于启动它的函数运行的函数。
Channel—通道是用于发送和接收数据的管道。通道为一个goroutine提供了一种将结构化数据发送到另一个goroutine的方法。

当您检查多任务处理时,并发性和并行性就会出现,它们通常可以互换使用,并发和并行是指相关但不同的东西。

并发-并发即将同时处理多个任务。这意味着您正在努力管理在给定时间段内同时完成的众多任务。但是,您一次只能执行一项任务。这往往发生在一个任务正在等待并且程序决定在空闲时间内驱动另一个任务的程序中。这是问题域的一个方面-您的程序需要处理许多同时发生的事件。

并行性-并行性是一次执行许多任务。这意味着即使我们有两个任务,它们也会持续工作,它们之间没有任何中断。这是解决方案域的一个方面-您希望通过并行处理问题的不同部分来加快程序。

并发程序具有多个逻辑控制线程。这些线程可能会也可能不会并行运行。通过同时(并行)执行计算的不同部分,并行程序可能比顺序程序运行得更快。它可能具有也可能没有多个逻辑控制线程。

 

哲学家就餐问题

五个沉默的哲学家坐在一张圆桌旁,端着一碗意大利面。叉子放在每对相邻的哲学家之间。每个哲学家都必须交替思考和吃饭。然而,哲学家只有在左右叉子都有时才能吃意大利面。每个叉子只能由一个哲学家握住,因此一个哲学家只有在另一个哲学家不使用叉子的情况下才能使用叉子。在个别哲学家吃完饭后,他们需要放下两把叉子,以便其他人可以使用叉子。哲学家可以在他们可用时拿走他们右边的叉子或左边的叉子,但在得到两个叉子之前不能开始吃东西。进食不受剩余意大利面或胃空间的限制;假设有无限的供应和无限的需求。问题是如何设计一个行为学科(并发算法),这样就不会有哲学家会饿死;也就是说,每个人都可以永远继续在吃饭和思考之间交替,假设没有哲学家知道别人什么时候想吃东西或想思考。

package main

import (
	"hash/fnv"
	"log"
	"math/rand"
	"os"
	"sync"
	"time"
)

// 哲学家数组
var ph = []string{"Mark", "Russell", "Rocky", "Haris", "Root"}

const hunger = 3                // 就餐次数
const think = time.Second / 100 // 思考时间
const eat = time.Second / 100   // 就餐时间

var fmt = log.New(os.Stdout, "", 0)

var dining sync.WaitGroup

func diningProblem(phName string, dominantHand, otherHand *sync.Mutex) {
	fmt.Println(phName, "Seated")
	h := fnv.New64a()
	h.Write([]byte(phName))
	rg := rand.New(rand.NewSource(int64(h.Sum64())))
	rSleep := func(t time.Duration) {
		time.Sleep(t/2 + time.Duration(rg.Int63n(int64(t))))
	}
	for h := hunger; h > 0; h-- {
		fmt.Println(phName, "Hungry")
		dominantHand.Lock() // 就餐
		otherHand.Lock()
		fmt.Println(phName, "Eating")
		rSleep(eat)
		dominantHand.Unlock() // 就餐结束
		otherHand.Unlock()
		fmt.Println(phName, "Thinking")
		rSleep(think)
	}
	fmt.Println(phName, "Satisfied")
	dining.Done()
	fmt.Println(phName, "Left the table")
}

func main() {
	fmt.Println("Table empty")
	dining.Add(5)
	fork0 := &sync.Mutex{}
	forkLeft := fork0
	for i := 1; i < len(ph); i++ {
		forkRight := &sync.Mutex{}
		go diningProblem(ph[i], forkLeft, forkRight)
		forkLeft = forkRight
	}
	go diningProblem(ph[0], fork0, forkLeft)
	dining.Wait() // wait for philosphers to finish
	fmt.Println("Table empty")
}

输出

Table empty
Mark seated
Mark Hungry
Mark Eating
..................
..................
Haris Thinking
Haris Satisfied
Haris Left the table
Table empty

 

检查点同步问题

检查点同步是同步多个任务的问题。考虑一个车间,几个工人组装一些机制的细节。当他们每个人都完成他的工作时,他们会把细节放在一起。没有商店,所以首先完成部分的工人必须等待其他人才能开始另一个部分。将细节放在一起是任务在分开路径之前进行自我同步的检查点。

package main

import (
	"log"
	"math/rand"
	"sync"
	"time"
)

func worker(part string) {
	log.Println(part, "worker begins part")
	time.Sleep(time.Duration(rand.Int63n(1e6)))
	log.Println(part, "worker completes part")
	wg.Done()
}

var (
	partList    = []string{"A", "B", "C", "D"}
	nAssemblies = 3
	wg          sync.WaitGroup
)

func main() {
	rand.Seed(time.Now().UnixNano())
	for c := 1; c <= nAssemblies; c++ {
		log.Println("begin assembly cycle", c)
		wg.Add(len(partList))
		for _, part := range partList {
			go worker(part)
		}
		wg.Wait()
		log.Println("assemble.  cycle", c, "complete")
	}
}

输出

2019/07/15 16:10:32 begin assembly cycle 1
2019/07/15 16:10:32 D worker begins part
2019/07/15 16:10:32 A worker begins part
2019/07/15 16:10:32 B worker begins part
........
2019/07/15 16:10:32 D worker completes part
2019/07/15 16:10:32 C worker completes part
2019/07/15 16:10:32 assemble.  cycle 3 complete

 

生产者消费者问题

该问题描述了两个进程,即生产者和使用者,它们共享一个用作队列的通用固定大小缓冲区。生产者的工作是生成数据,将其放入缓冲区,然后重新开始。同时,使用者正在消耗数据(即,将其从缓冲区中删除),一次一个片段。问题是要确保生成者不会尝试将数据添加到缓冲区中(如果缓冲区已满),并且使用者不会尝试从空缓冲区中删除数据。生产者的解决方案是进入睡眠状态,或者在缓冲区已满时丢弃数据。下次使用者从缓冲区中删除项目时,它会通知生产者,生产者再次开始填充缓冲区。同样,如果使用者发现缓冲区为空,则可以进入睡眠状态。下次生产者将数据放入缓冲区时,它会唤醒沉睡的使用者。

package main

import (
	"flag"
	"fmt"
	"log"
	"os"
	"runtime"
	"runtime/pprof"
)

// 消费者
type Consumer struct {
	msgs *chan int
}

// 新建消费者
func NewConsumer(msgs *chan int) *Consumer {
	return &Consumer{msgs: msgs}
}

// 消费者消费消息
func (c *Consumer) consume() {
	fmt.Println("consume: Started")
	for {
		msg := <-*c.msgs
		fmt.Println("consume: Received:", msg)
	}
}

// 生产者
type Producer struct {
	msgs *chan int
	done *chan bool
}

// 新建生产者
func NewProducer(msgs *chan int, done *chan bool) *Producer {
	return &Producer{msgs: msgs, done: done}
}

// 生产消息
func (p *Producer) produce(max int) {
	fmt.Println("produce: Started")
	for i := 0; i < max; i++ {
		fmt.Println("produce: Sending ", i)
		*p.msgs <- i
	}
	*p.done <- true // signal when done
	fmt.Println("produce: Done")
}

func main() {
	// cpu和内存profile信息保存到文件中
	cpuprofile := flag.String("cpuprofile", "", "write cpu profile to `file`")
	memprofile := flag.String("memprofile", "", "write memory profile to `file`")

	// get the maximum number of messages from flags
	max := flag.Int("n", 5, "defines the number of messages")

	flag.Parse()

	// 使用所有可用cpu
	runtime.GOMAXPROCS(runtime.NumCPU())

	// CPU Profile
	if *cpuprofile != "" {
		f, err := os.Create(*cpuprofile)
		if err != nil {
			log.Fatal("could not create CPU profile: ", err)
		}
		if err := pprof.StartCPUProfile(f); err != nil {
			log.Fatal("could not start CPU profile: ", err)
		}
		defer pprof.StopCPUProfile()
	}

	var msgs = make(chan int)  // 发送和接收消息的管道
	var done = make(chan bool) // 生产者是否完成工作的消息通道

	// 新建生产者并生产消息
	go NewProducer(&msgs, &done).produce(*max)

	// 新建消费者并消费消息
	go NewConsumer(&msgs).consume()

	// 当生产者生产完消息后结束程序
	<-done

	// Memory Profile
	if *memprofile != "" {
		f, err := os.Create(*memprofile)
		if err != nil {
			log.Fatal("could not create memory profile: ", err)
		}
		runtime.GC() // 启动垃圾回收器
		if err := pprof.WriteHeapProfile(f); err != nil {
			log.Fatal("could not write memory profile: ", err)
		}
		f.Close()
	}
}

输出

consume: Started
produce: Started
produce: Sending  0
produce: Sending  1
consume: Received: 0
consume: Received: 1
produce: Sending  2
produce: Sending  3
consume: Received: 2
consume: Received: 3
produce: Sending  4
produce: Done

 

理发师的睡觉问题

理发师在剪裁室里有一把理发椅,还有一个等候室,里面有几把椅子。当理发师剪完顾客的头发后,他解雇了顾客,然后去等候室看看是否有其他人在等。如果有,他把其中一个带回椅子上,剪掉他们的头发。如果没有,他就回到椅子上睡在椅子上。每个顾客到达时,都会看看理发师在做什么。如果理发师正在睡觉,顾客会叫醒他,坐在剪裁室的椅子上。如果理发师正在剪头发,顾客就会留在候诊室。如果候诊室里有免费的椅子,顾客就会坐在里面等待轮到他们。如果没有空闲的椅子,客户就会离开。

package main

import (
	"fmt"
	"sync"
	"time"
)

const (
	sleeping = iota
	checking
	cutting
)

var stateLog = map[int]string{
	0: "Sleeping",
	1: "Checking",
	2: "Cutting",
}
var wg *sync.WaitGroup // 潜在客户数量

type Barber struct {
	name string
	sync.Mutex
	state    int // Sleeping/Checking/Cutting
	customer *Customer
}

type Customer struct {
	name string
}

func (c *Customer) String() string {
	return fmt.Sprintf("%p", c)[7:]
}

func NewBarber() (b *Barber) {
	return &Barber{
		name:  "Sam",
		state: sleeping,
	}
}

// 理发师goroutine
// 检查等候室客户
// 睡觉-等待被叫醒
func barber(b *Barber, wr chan *Customer, wakers chan *Customer) {
	for {
		b.Lock()
		defer b.Unlock()
		b.state = checking
		b.customer = nil

		// 检查等候室
		fmt.Printf("Checking waiting room: %d\n", len(wr))
		time.Sleep(time.Millisecond * 100)
		select {
		case c := <-wr:
			HairCut(c, b)
			b.Unlock()
		default: // 当等候室没有客户时
			fmt.Printf("Sleeping Barber - %s\n", b.customer)
			b.state = sleeping
			b.customer = nil
			b.Unlock()
			c := <-wakers
			b.Lock()
			fmt.Printf("Woken by %s\n", c)
			HairCut(c, b)
			b.Unlock()
		}
	}
}

// 理发
func HairCut(c *Customer, b *Barber) {
	b.state = cutting
	b.customer = c
	b.Unlock()
	fmt.Printf("Cutting  %s hair\n", c)
	time.Sleep(time.Millisecond * 100)
	b.Lock()
	wg.Done()
	b.customer = nil
}

// 客户goroutine
// 如果等候室满了就离开, 否则再大厅等待
func customer(c *Customer, b *Barber, wr chan<- *Customer, wakers chan<- *Customer) {
	// 到达
	time.Sleep(time.Millisecond * 50)
	// 检查理发店
	b.Lock()
	fmt.Printf("Customer %s checks %s barber | room: %d, w %d - customer: %s\n",
		c, stateLog[b.state], len(wr), len(wakers), b.customer)
	switch b.state {
	case sleeping:
		select {
		case wakers <- c:
		default:
			select {
			case wr <- c:
			default:
				wg.Done()
			}
		}
	case cutting:
		select {
		case wr <- c:
		default: // 等候室满了就离开
			wg.Done()
		}
	case checking:
		panic("Customer shouldn't check for the Barber when Barber is Checking the waiting room")
	}
	b.Unlock()
}

func main() {
	b := NewBarber()
	b.name = "Rocky"
	WaitingRoom := make(chan *Customer, 5) // 5把椅子
	Wakers := make(chan *Customer, 1)      // 每次接待一个客户
	go barber(b, WaitingRoom, Wakers)

	time.Sleep(time.Millisecond * 100)
	wg = new(sync.WaitGroup)
	n := 10
	wg.Add(10)
	// 生产客户
	for i := 0; i < n; i++ {
		time.Sleep(time.Millisecond * 50)
		c := new(Customer)
		go customer(c, b, WaitingRoom, Wakers)
	}

	wg.Wait()
	fmt.Println("No more customers for the day")
}

输出

Checking waiting room: 0
Sleeping Barber - 
Customer 120 checks Sleeping barber | room: 0, w 0 - customer: 
Woken by 120
..............
..............
Checking waiting room: 0
No more customers for the day

 

吸烟者问题

假设一支香烟需要三种成分来制作和吸烟:烟草、纸张和火柴。一张桌子周围有三个吸烟者,每个人都有三种成分中的一种无限供应——一个吸烟者有无限的烟草供应,另一个有纸,第三个有火柴。有一个第四者,拥有无限供应的一切,随机选择一个吸烟者,并将香烟所需的另外两种用品放在桌子上,让被选择的吸烟者吸烟,该过程应无限期重复。

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

const (
	paper = iota
	grass
	match
)

var smokeMap = map[int]string{
	paper: "paper",
	grass: "grass",
	match: "match",
}

var names = map[int]string{
	paper: "Sandy",
	grass: "Apple",
	match: "Daisy",
}

type Table struct {
	paper chan int
	grass chan int
	match chan int
}

func arbitrate(t *Table, smokers [3]chan int) {
	for {
		time.Sleep(time.Millisecond * 500)
		next := rand.Intn(3)
		fmt.Printf("Table chooses %s: %s\n", smokeMap[next], names[next])
		switch next {
		case paper:
			t.grass <- 1
			t.match <- 1
		case grass:
			t.paper <- 1
			t.match <- 1
		case match:
			t.grass <- 1
			t.paper <- 1
		}
		for _, smoker := range smokers {
			smoker <- next
		}
		wg.Add(1)
		wg.Wait()
	}
}

func smoker(t *Table, name string, smokes int, signal chan int) {
	var chosen = -1
	for {
		chosen = <-signal // blocks

		if smokes != chosen {
			continue
		}

		fmt.Printf("Table: %d grass: %d match: %d\n", len(t.paper), len(t.grass), len(t.match))
		select {
		case <-t.paper:
		case <-t.grass:
		case <-t.match:
		}
		fmt.Printf("Table: %d grass: %d match: %d\n", len(t.paper), len(t.grass), len(t.match))
		time.Sleep(10 * time.Millisecond)
		select {
		case <-t.paper:
		case <-t.grass:
		case <-t.match:
		}
		fmt.Printf("Table: %d grass: %d match: %d\n", len(t.paper), len(t.grass), len(t.match))
		fmt.Printf("%s smokes a cigarette\n", name)
		time.Sleep(time.Millisecond * 500)
		wg.Done()
		time.Sleep(time.Millisecond * 100)
	}
}

const LIMIT = 1

var wg *sync.WaitGroup

func main() {
	wg = new(sync.WaitGroup)
	table := new(Table)
	table.match = make(chan int, LIMIT)
	table.paper = make(chan int, LIMIT)
	table.grass = make(chan int, LIMIT)
	var signals [3]chan int
	// three smokers
	for i := 0; i < 3; i++ {
		signal := make(chan int, 1)
		signals[i] = signal
		go smoker(table, names[i], i, signal)
	}
	fmt.Printf("%s, %s, %s, sit with \n%s, %s, %s\n\n", names[0], names[1], names[2], smokeMap[0], smokeMap[1], smokeMap[2])
	arbitrate(table, signals)
}

输出

Sandy, Apple, Daisy, sit with
paper, grass, match

Table chooses match: Daisy
Table: 1 grass: 1 match: 0
Table: 1 grass: 0 match: 0
Table: 0 grass: 0 match: 0
Daisy smokes a cigarette
Table chooses paper: Sandy
Table: 0 grass: 1 match: 1
Table: 0 grass: 1 match: 0
Table: 0 grass: 0 match: 0
Sandy smokes a cigarette
Table chooses match: Daisy
Table: 1 grass: 1 match: 0
Table: 1 grass: 0 match: 0
Table: 0 grass: 0 match: 0
Daisy smokes a cigarette
用PHP fiber能不能达到go的并发性能?

答案是可能的,但也取决于具体应用场景。

在理论上,PHP fiber 和 Go 的并发性能是相同的。PHP fiber 是一种协程,而 Go 也是一种协程语言。协程是一种轻量级的线程,可以通过上下文切换来实现并发。在相同的硬件环境下,PHP fiber 和 Go 可以处理相同数量的并发任务。

但是,在实际应用中,PHP fiber 和 Go 的并发性能可能存在差异。PHP fiber 是基于原生 PHP 实现的,而 Go 是专门为并发编程设计的语言。Go 提供了更加优化的并发原语和库,这可以提高 Go 的并发性能。

此外,PHP fiber 和 Go 的编码方式也可能会影响并发性能。PHP fiber 的编码方式与传统的 PHP 编码方式非常相似,这使得开发者可以更容易地使用 PHP fiber。但是,PHP fiber 的编码方式也可能存在一些缺陷,例如,PHP fiber 的上下文切换可能会导致性能损失。

一些可以提高 PHP fiber 并发性能的建议:

  • 使用优化的协程库。
  • 避免使用会导致上下文切换的操作。
  • 使用高效的编码方式。
  • 当前日期:
  • 北京时间:
  • 时间戳:
  • 今年的第:18周
  • 我的 IP:18.116.118.216
农历
五行
冲煞
彭祖
方位
吉神
凶神
极简任务管理 help
+ 0 0 0
Task Idea Collect