11.8 并发任务的控制
一、并发任务的控制
非阻塞等待
// 非阻塞等待 func noBlockingWait(c chan string) (string, bool) { select { case m := <-c: return m, true default: return "", false } }超时机制
// 超时 func timeoutWait(c chan string, timeout time.Duration) (string, bool) { select { case m := <-c: return m, true case <-time.After(timeout): return "", false } }任务中断/退出
func msgGen(name string, done chan struct{}) chan string { c := make(chan string) go func() { i := 0 for { select { case <-time.After(time.Duration(rand.Intn(5000)) * time.Millisecond): c <- fmt.Sprintf("server %s: message %d", name, i) case <-done: // 中断 fmt.Println("异常中断,清理中。。。") time.Sleep(2 * time.Second) fmt.Println("清理完成") return } i++ } }() return c }优雅退出
func msgGen(name string, done chan struct{}) chan string { c := make(chan string) go func() { i := 0 for { select { case <-time.After(time.Duration(rand.Intn(5000)) * time.Millisecond): c <- fmt.Sprintf("server %s: message %d", name, i) case <-done: // 中断 fmt.Println("异常中断,清理中。。。") time.Sleep(2 * time.Second) fmt.Println("清理完成") done <- struct{}{} // 优雅退出,双向管道 return } i++ } }() return c }完整代码:
package main import ( "fmt" "math/rand" "time" ) func msgGen(name string, done chan struct{}) chan string { c := make(chan string) go func() { i := 0 for { select { case <-time.After(time.Duration(rand.Intn(5000)) * time.Millisecond): c <- fmt.Sprintf("server %s: message %d", name, i) case <-done: // 中断 fmt.Println("异常中断,清理中。。。") time.Sleep(2 * time.Second) fmt.Println("清理完成") done <- struct{}{} // 优雅退出,双向管道 return } i++ } }() return c } // 同时等待多个服务 func fanIn(chs ...chan string) chan string { c := make(chan string) for _, ch := range chs { go func(in chan string) { for { c <- <-in } }(ch) } return c } // 同时等待多个服务2 func fanInSelect(c1, c2 chan string) chan string { c := make(chan string) go func() { for { select { case m := <-c1: c <- m case m := <-c2: c <- m } } }() return c } // 非阻塞等待 func noBlockingWait(c chan string) (string, bool) { select { case m := <-c: return m, true default: return "", false } } // 超时 func timeoutWait(c chan string, timeout time.Duration) (string, bool) { select { case m := <-c: return m, true case <-time.After(timeout): return "", false } } func main() { done := make(chan struct{}) // m1 := msgGen("m1") m2 := msgGen("m2", done) // m := fanIn(m1, m2) // m := fanInSelect(m1, m2) for i := 0; i < 5; i++ { // fmt.Println(<-m1) // if m, ok := noBlockingWait(m2); ok { // 非阻塞等待 if m, ok := timeoutWait(m2, 2*time.Second); ok { // 超时 fmt.Println(m) } else { fmt.Println("无消息") } } done <- struct{}{} // 优雅退出方式 <- done }
Last updated
Was this helpful?