Loading tickets/tickets_test.go +12 −12 Original line number Diff line number Diff line Loading @@ -80,38 +80,38 @@ func TestTicketsComplex(t *testing.T) { }(maxTickets) // dispatch work to distinct workers workerStreams := make([]chan WorkerResult, numWorkers) for worker := 0; worker < numWorkers; worker++ { workerStreams[worker] = func(id int, tickets chan *Ticket) chan WorkerResult { result := make(chan WorkerResult) workerResultStreamArray := make([]chan WorkerResult, numWorkers) for workerIndex := 0; workerIndex < numWorkers; workerIndex++ { workerResultStreamArray[workerIndex] = func(id int, ticketStream chan *Ticket) chan WorkerResult { workerResultStream := make(chan WorkerResult) count := 0 go func() { t.Logf("worker %d started", id) defer func() { result <- WorkerResult{ID: id, Count: count} close(result) workerResultStream <- WorkerResult{ID: id, Count: count} close(workerResultStream) t.Logf("worker %d terminated", id) }() for ticket := range tickets { for ticket := range ticketStream { ticket.Process() count++ t.Logf("worker %d processed ticket %d (duration %s)", id, ticket.ID, ticket.Duration.String()) } }() return result }(worker, ticketStream) return workerResultStream }(workerIndex, ticketStream) } // collect results from all workers workerResultStream := make(chan WorkerResult) go func() { defer close(workerResultStream) for _, resultStream := range workerStreams { for _, indexedWorkerResultStream := range workerResultStreamArray { select { case result := <-resultStream: workerResultStream <- result case workerResult := <-indexedWorkerResultStream: workerResultStream <- workerResult } } }() Loading Loading
tickets/tickets_test.go +12 −12 Original line number Diff line number Diff line Loading @@ -80,38 +80,38 @@ func TestTicketsComplex(t *testing.T) { }(maxTickets) // dispatch work to distinct workers workerStreams := make([]chan WorkerResult, numWorkers) for worker := 0; worker < numWorkers; worker++ { workerStreams[worker] = func(id int, tickets chan *Ticket) chan WorkerResult { result := make(chan WorkerResult) workerResultStreamArray := make([]chan WorkerResult, numWorkers) for workerIndex := 0; workerIndex < numWorkers; workerIndex++ { workerResultStreamArray[workerIndex] = func(id int, ticketStream chan *Ticket) chan WorkerResult { workerResultStream := make(chan WorkerResult) count := 0 go func() { t.Logf("worker %d started", id) defer func() { result <- WorkerResult{ID: id, Count: count} close(result) workerResultStream <- WorkerResult{ID: id, Count: count} close(workerResultStream) t.Logf("worker %d terminated", id) }() for ticket := range tickets { for ticket := range ticketStream { ticket.Process() count++ t.Logf("worker %d processed ticket %d (duration %s)", id, ticket.ID, ticket.Duration.String()) } }() return result }(worker, ticketStream) return workerResultStream }(workerIndex, ticketStream) } // collect results from all workers workerResultStream := make(chan WorkerResult) go func() { defer close(workerResultStream) for _, resultStream := range workerStreams { for _, indexedWorkerResultStream := range workerResultStreamArray { select { case result := <-resultStream: workerResultStream <- result case workerResult := <-indexedWorkerResultStream: workerResultStream <- workerResult } } }() Loading