func(p *Payload)UploadToS3()error { // the storageFolder method ensures that there are no name collision in // case we get same timestamp in the key name storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())
bucket := S3Bucket
b := new(bytes.Buffer) encodeErr := json.NewEncoder(b).Encode(payload) if encodeErr != nil { return encodeErr }
// Everything we post to the S3 bucket should be marked 'private' var acl = s3.Private var contentType = "application/octet-stream"
funcpayloadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return }
// Read the body into a string for json decoding var content = &PayloadCollection{} err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content) if err != nil { w.Header().Set("Content-Type", "application/json; charset=UTF-8") w.WriteHeader(http.StatusBadRequest) return }
// Go through each payload and queue items individually to be posted to S3 for _, payload := range content.Payloads { go payload.UploadToS3() // <----- DON'T DO THIS }
需要注意的是这只是一个伪代码,明显是有 bug 的,大家不要轻易的拿去用,比如经典的 for range循环问题,这个在之前的文章中我也说过 range 是形成一个新指针每次循环都将结构拷贝到新指针处,golang 在编译时,起 go 程是发生在函数调用之后的所以这个这个循环起 goroutine 将会使用最后一个数据集。建议用下标循环或是做一个新指针接收拷贝值再掉用
funcpayloadHandler(w http.ResponseWriter, r *http.Request) { ... // Go through each payload and queue items individually to be posted to S3 for _, payload := range content.Payloads { Queue <- payload } ... }
消费者
1 2 3 4 5 6 7 8
funcStartProcessor() { for { select { case job := <-Queue: job.payload.UploadToS3() // <-- STILL NOT GOOD } } }
// Start method starts the run loop for the worker, listening for a quit channel in // case we need to stop it func(w Worker)Start() { gofunc() { for { // register the current worker into the worker queue. w.WorkerPool <- w.JobChannel
select { case job := <-w.JobChannel: // we have received a work request. if err := job.Payload.UploadToS3(); err != nil { log.Errorf("Error uploading to S3: %s", err.Error()) }
case <-w.quit: // we have received a signal to stop return } } }() }
// Stop signals the worker to stop listening for work requests. func(w Worker)Stop() { gofunc() { w.quit <- true }() }
funcpayloadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return }
// Read the body into a string for json decoding var content = &PayloadCollection{} err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content) if err != nil { w.Header().Set("Content-Type", "application/json; charset=UTF-8") w.WriteHeader(http.StatusBadRequest) return }
// Go through each payload and queue items individually to be posted to S3 for _, payload := range content.Payloads {
// let's create a job with the payload work := Job{Payload: payload}
// Push the work onto the queue. JobQueue <- work }
func(d *Dispatcher)Run() { // starting n number of workers for i := 0; i < d.maxWorkers; i++ { worker := NewWorker(d.pool) worker.Start() }
go d.dispatch() }
func(d *Dispatcher)dispatch() { for { select { case job := <-JobQueue: // a job request has been received gofunc(job Job) { // try to obtain a worker job channel that is available. // this will block until a worker is idle jobChannel := <-d.WorkerPool
// dispatch the job to the worker job channel jobChannel <- job }(job) } } }
if w.SubWorkers == nil { w.SubWorkers = make([]SubWorker, w.MaxNum) }
if w.Wg == nil { w.Wg = &sync.WaitGroup{} }
for i := 0; i < len(w.SubWorkers); i++ { w.SubWorkers[i].Run(w.Wg, w.ChPool, w.QuitChan) } }
启动并发池,将最大数量的 SubWorker 设置出来,并 Run 起这些消费者形成并发池。
Stop
1 2 3 4 5 6
func(w *W2)Stop() { close(w.QuitChan) w.Wg.Wait()
close(w.ChPool) }
结束,结束这里是作者没有考虑到的,优雅的结束一个并发组件是极其重要的,如果每次使用你所启动的并发都不考虑用完结束,那么将存在着极大的内存溢出的风险,影响机器性能。这里的结束其实就是关闭 QuitChan, 这个结束管道同时控制着消费者和生产者,我们看一下关闭结束管道后会发生些什么事。首先消费者将会在接收任务时触发关闭管道的 case 并退出该协程,生产者也会在发送任务的时候触发关闭 case 退出协程。
这里有一个值得注意的点,生产者过多会阻塞在第一步获取 poolCh 并发管道的时候,而消费者的在触发关闭管道的 case 前是不会被阻塞的,所以我们只要等待消费者把所有任务做完并全部退出,之后将并发池管道关闭,对于一个关闭的管道尝试去获取他数据都会获取到该数据类型的零值,生产者获取关闭的并发管道池将会获取 nil chan 在尝试将任务放入该管道时会阻塞,此时将会触发关闭管道的 case。此时所有生产者也都会退出协程。
func(w *Worker)Do(){ for task := range w.WorkChan{ // 任务重复检查 if _,ok:= w.WorkQueue.LoadOrStore(task, struct {}{});!ok{ fmt.Println("work is doing",task) continue }
// 并发量控制 for atomic.LoadInt64(&w.Total) >= maxCurrency{ time.Sleep(1 * time.Second) }