Commit 81533c7f authored by Boris Mühmer's avatar Boris Mühmer
Browse files

added an example to process workload dipatched on various distinct workers

parent 25b02b0a
Loading
Loading
Loading
Loading

tickets/tickets.go

0 → 100644
+90 −0
Original line number Diff line number Diff line
package tickets

import (
	"math/rand"
	"sync"
	"time"
)

// Ticket simulates a ticket which needs a certain time to process.
type Ticket struct {
	ID       int
	Duration time.Duration
}

// NewTicket creates a new ticket.
func NewTicket(id int, duration time.Duration) *Ticket {
	return &Ticket{
		ID:       id,
		Duration: duration,
	}
}

// RandomTickets returns a stream of tickets.
func RandomTickets(maxTickets int, maxDuration time.Duration) <-chan *Ticket {
	ticketStream := make(chan *Ticket)

	go func() {
		defer close(ticketStream)

		for id := 0; id < maxTickets; id++ {
			d := time.Duration(rand.Int63n(maxDuration.Nanoseconds())) * time.Nanosecond
			ticket := NewTicket(id, d)
			ticketStream <- ticket
		}
	}()

	return ticketStream
}

// Process executes a ticket.
func (t *Ticket) Process() {
	time.Sleep(t.Duration)
}

// TicketProcessor works on a ticket.
func TicketProcessor(done <-chan bool, inStream <-chan *Ticket) <-chan *Ticket {
	outStream := make(chan *Ticket)
	go func() {
		defer close(outStream)
		for {
			select {
			case <-done:
				return
			case ticket := <-inStream:
				ticket.Process()
				outStream <- ticket
			}
		}
	}()
	return outStream
}

// TicketCollector merges the results of TicketProcessors.
func TicketCollector(done <-chan bool, workers ...<-chan *Ticket) <-chan *Ticket {
	var wg sync.WaitGroup
	outStream := make(chan *Ticket)

	multiplex := func(c <-chan *Ticket) {
		defer wg.Done()
		for ticket := range c {
			select {
			case <-done:
				return
			case outStream <- ticket:
			}
		}
	}

	wg.Add(len(workers))
	for _, w := range workers {
		go multiplex(w)
	}

	go func() {
		wg.Wait()
		close(outStream)
	}()

	return outStream
}
+129 −0
Original line number Diff line number Diff line
package tickets

import (
	"math/rand"
	"runtime"
	"testing"
	"time"
)

var (
	maxTickets  = 10
	maxDuration = 1 * time.Second
	maxWorkers  = runtime.NumCPU()
)

func TestTicketsSimple(t *testing.T) {
	start := time.Now()
	for ticket := range RandomTickets(maxTickets, maxDuration) {
		ticket.Process()
	}
	finished := time.Now()
	elapsed := finished.Sub(start)
	t.Logf("processed %d ticket(s) in %s", maxTickets, elapsed.String())
}

func TestTicketsParallel(t *testing.T) {
	t.Skip("NOT WORKING RIGHT NOW")

	done := make(chan bool)
	defer close(done)

	duration := time.Duration(0)

	start := time.Now()

	tickets := RandomTickets(maxTickets, maxDuration)
	workers := make([]<-chan *Ticket, maxWorkers)
	for worker := 0; worker < maxWorkers; worker++ {
		workers[worker] = TicketProcessor(done, tickets)
	}
	for ticket := range TicketCollector(done, workers...) {
		//t.Logf("ticket %d finished", ticket.ID)
		duration += ticket.Duration
	}

	finished := time.Now()
	elapsed := finished.Sub(start)
	t.Logf("processed %d ticket(s) in %s - ticket duration sum: %s", maxTickets, elapsed.String(), duration.String())
}

func TestTicketsComplex(t *testing.T) {

	type WorkerResult struct {
		ID, Count int
	}

	maxTickets := 200
	numWorkers := runtime.NumCPU()
	numBuffered := 50 // 0 for an unbuffered channel

	t.Logf("number of tickets: %d", maxTickets)
	t.Logf("numWorkers: %d", numWorkers)

	// gernerate some work
	var ticketStream chan *Ticket
	ticketStream = make(chan *Ticket, numBuffered)
	go func(numTickets int) {
		defer func() {
			close(ticketStream)
			t.Logf("ticketStream closed")
		}()
		t.Logf("number of tickets to generate: %d", numTickets)
		for i := 0; i < numTickets; i++ {
			//d := 200 * time.Millisecond
			d := time.Duration(100+rand.Int63n(400)) * time.Millisecond
			ticket := NewTicket(i, d)
			t.Logf("prepared ticket: %d / %s", ticket.ID, ticket.Duration.String())
			ticketStream <- ticket
		}
	}(maxTickets)

	// dispatch work to distinct workers
	workerStreams := make([]chan WorkerResult, numWorkers)
	for worker := 0; worker < numWorkers; worker++ {
		workerStreams[worker] = func(id int, tickets chan *Ticket) chan WorkerResult {
			result := make(chan WorkerResult)
			count := 0

			go func() {
				t.Logf("worker %d started", id)
				defer func() {
					result <- WorkerResult{ID: id, Count: count}
					close(result)
					t.Logf("worker %d terminated", id)
				}()
				for ticket := range tickets {
					ticket.Process()
					count++
					t.Logf("worker %d processed ticket %d (duration %s)", id, ticket.ID, ticket.Duration.String())
				}
			}()

			return result
		}(worker, ticketStream)
	}

	// collect results from all workers
	workerResultStream := make(chan WorkerResult)
	go func() {
		defer close(workerResultStream)
		for _, resultStream := range workerStreams {
			select {
			case result := <-resultStream:
				workerResultStream <- result
			}
		}
	}()

	// evaluate results
	processedTickets := 0
	for i := range workerResultStream {
		t.Logf("worker %d processed %d", i.ID, i.Count)
		processedTickets += i.Count
	}
	t.Logf("processed tickets: %d", processedTickets)
	if maxTickets != processedTickets {
		t.Errorf("the count of processed tickets is %d, expected %d", processedTickets, maxTickets)
	}
}