11.8 并发任务的控制

一、并发任务的控制

  1. 非阻塞等待

    // 非阻塞等待
    func noBlockingWait(c chan string) (string, bool) {
    	select {
    	case m := <-c:
    		return m, true
    	default:
    		return "", false
    	}
    }
  2. 超时机制

    // 超时
    func timeoutWait(c chan string, timeout time.Duration) (string, bool) {
    	select {
    	case m := <-c:
    		return m, true
    	case <-time.After(timeout):
    		return "", false
    
    	}
    }
  3. 任务中断/退出

    func msgGen(name string, done chan struct{}) chan string {
    	c := make(chan string)
    	go func() {
    		i := 0
    		for {
    			select {
    			case <-time.After(time.Duration(rand.Intn(5000)) * time.Millisecond):
    				c <- fmt.Sprintf("server %s:  message %d", name, i)
    			case <-done:  // 中断
    				fmt.Println("异常中断,清理中。。。")
    				time.Sleep(2 * time.Second)
    				fmt.Println("清理完成")
    				return
    			}
    			i++
    		}
    	}()
    	return c
    }
  4. 优雅退出

    func msgGen(name string, done chan struct{}) chan string {
    	c := make(chan string)
    	go func() {
    		i := 0
    		for {
    			select {
    			case <-time.After(time.Duration(rand.Intn(5000)) * time.Millisecond):
    				c <- fmt.Sprintf("server %s:  message %d", name, i)
    			case <-done:  // 中断
    				fmt.Println("异常中断,清理中。。。")
    				time.Sleep(2 * time.Second)
    				fmt.Println("清理完成")
    				done <- struct{}{}  // 优雅退出,双向管道
    				return
    			}
    			i++
    		}
    	}()
    	return c
    }
  5. 完整代码:

    package main
    
    import (
    	"fmt"
    	"math/rand"
    	"time"
    )
    
    func msgGen(name string, done chan struct{}) chan string {
    	c := make(chan string)
    	go func() {
    		i := 0
    		for {
    			select {
    			case <-time.After(time.Duration(rand.Intn(5000)) * time.Millisecond):
    				c <- fmt.Sprintf("server %s:  message %d", name, i)
    			case <-done:  // 中断
    				fmt.Println("异常中断,清理中。。。")
    				time.Sleep(2 * time.Second)
    				fmt.Println("清理完成")
    				done <- struct{}{}  // 优雅退出,双向管道
    				return
    			}
    			i++
    		}
    	}()
    	return c
    }
    
    // 同时等待多个服务
    func fanIn(chs ...chan string) chan string {
    	c := make(chan string)
    	for _, ch := range chs {
    		go func(in chan string) {
    			for {
    				c <- <-in
    			}
    		}(ch)
    	}
    	return c
    }
    
    // 同时等待多个服务2
    func fanInSelect(c1, c2 chan string) chan string {
    	c := make(chan string)
    	go func() {
    		for {
    			select {
    			case m := <-c1:
    				c <- m
    			case m := <-c2:
    				c <- m
    			}
    		}
    	}()
    
    	return c
    }
    
    // 非阻塞等待
    func noBlockingWait(c chan string) (string, bool) {
    	select {
    	case m := <-c:
    		return m, true
    	default:
    		return "", false
    	}
    }
    
    // 超时
    func timeoutWait(c chan string, timeout time.Duration) (string, bool) {
    	select {
    	case m := <-c:
    		return m, true
    	case <-time.After(timeout):
    		return "", false
    
    	}
    }
    
    func main() {
    	done := make(chan struct{})
    	// m1 := msgGen("m1")
    	m2 := msgGen("m2", done)
    	// m := fanIn(m1, m2)
    	// m := fanInSelect(m1, m2)
    
    	for i := 0; i < 5; i++ {
    		// fmt.Println(<-m1)
    		// if m, ok := noBlockingWait(m2); ok {  // 非阻塞等待
    		if m, ok := timeoutWait(m2, 2*time.Second); ok { // 超时
    			fmt.Println(m)
    		} else {
    			fmt.Println("无消息")
    		}
    	}
    	done <- struct{}{}
    	// 优雅退出方式
    	<- done
    }

Last updated

Was this helpful?