Commit b9d92cf4 authored by Boris Mühmer's avatar Boris Mühmer
Browse files

snapshot of added examples



Signed-off-by: default avatarBoris Mühmer <boris@muehmer.de>
parent 0c0cef7e
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
package concurrency
+79 −0
Original line number Diff line number Diff line
package concurrency

import "testing"

func TestConcurrencyBasic001(t *testing.T) {
	stringStream := make(chan string)
	close(stringStream)
	go func() {
		stringStream <- "Hello from goroutine!"
	}()

	salutation, ok := <-stringStream
	t.Logf("status \"%t\": \"%s\"", ok, salutation)

	salutation, ok = <-stringStream
	t.Logf("status \"%t\": \"%s\"", ok, salutation)
}

func TestConcurrencyBasic002(t *testing.T) {
	stringStream := make(chan string)
	defer close(stringStream)
	go func() {
		stringStream <- "Hello from goroutine!"
	}()

	salutation, ok := <-stringStream
	t.Logf("status \"%t\": \"%s\"", ok, salutation)

	salutation, ok = <-stringStream
	t.Logf("status \"%t\": \"%s\"", ok, salutation)
}

func TestConcurrencyBasic003(t *testing.T) {
	stringStream := make(chan string)
	go func() {
		defer close(stringStream)
		stringStream <- "Hello from goroutine!"
	}()

	salutation, ok := <-stringStream
	t.Logf("status \"%t\": \"%s\"", ok, salutation)

	salutation, ok = <-stringStream
	t.Logf("status \"%t\": \"%s\"", ok, salutation)
}
func TestConcurrencyBasic004(t *testing.T) {
	intStream := make(chan int)
	go func() {
		defer close(intStream)
		for i := 0; i < 10; i++ {
			intStream <- i
		}
	}()

	for j := range intStream {
		t.Logf("j: %d", j)
	}
}
func TestConcurrencyBasic005(t *testing.T) {

}
func TestConcurrencyBasic006(t *testing.T) {

}
func TestConcurrencyBasic007(t *testing.T) {

}
func TestConcurrencyBasic008(t *testing.T) {

}
func TestConcurrencyBasic009(t *testing.T) {

}
func TestConcurrencyBasic010(t *testing.T) {

}
func TestConcurrencyBasic011(t *testing.T) {

}

tickets/tickets.go

0 → 100644
+90 −0
Original line number Diff line number Diff line
package tickets

import (
	"math/rand"
	"sync"
	"time"
)

// Ticket simulates a ticket which needs a certain time to process.
type Ticket struct {
	ID       int
	Duration time.Duration
}

// NewTicket creates a new ticket.
func NewTicket(id int, duration time.Duration) *Ticket {
	return &Ticket{
		ID:       id,
		Duration: duration,
	}
}

// RandomTickets returns a stream of tickets.
func RandomTickets(maxTickets int, maxDuration time.Duration) <-chan *Ticket {
	ticketStream := make(chan *Ticket)

	go func() {
		defer close(ticketStream)

		for id := 0; id < maxTickets; id++ {
			d := time.Duration(rand.Int63n(maxDuration.Nanoseconds())) * time.Nanosecond
			ticket := NewTicket(id, d)
			ticketStream <- ticket
		}
	}()

	return ticketStream
}

// Process executes a ticket.
func (t *Ticket) Process() {
	time.Sleep(t.Duration)
}

// TicketProcessor works on a ticket.
func TicketProcessor(done <-chan bool, inStream <-chan *Ticket) <-chan *Ticket {
	outStream := make(chan *Ticket)
	go func() {
		defer close(outStream)
		for {
			select {
			case <-done:
				return
			case ticket := <-inStream:
				ticket.Process()
				outStream <- ticket
			}
		}
	}()
	return outStream
}

// TicketCollector merges the results of TicketProcessors.
func TicketCollector(done <-chan bool, workers ...<-chan *Ticket) <-chan *Ticket {
	var wg sync.WaitGroup
	outStream := make(chan *Ticket)

	multiplex := func(c <-chan *Ticket) {
		defer wg.Done()
		for ticket := range c {
			select {
			case <-done:
				return
			case outStream <- ticket:
			}
		}
	}

	wg.Add(len(workers))
	for _, w := range workers {
		go multiplex(w)
	}

	go func() {
		wg.Wait()
		close(outStream)
	}()

	return outStream
}
+130 −0
Original line number Diff line number Diff line
package tickets

import (
	"math/rand"
	"runtime"
	"testing"
	"time"
)

var (
	maxTickets  = 10
	maxDuration = 1 * time.Second
	maxWorkers  = runtime.NumCPU()
)

func TestTicketsSimple(t *testing.T) {
	start := time.Now()
	for ticket := range RandomTickets(maxTickets, maxDuration) {
		ticket.Process()
	}
	finished := time.Now()
	elapsed := finished.Sub(start)
	t.Logf("processed %d ticket(s) in %s", maxTickets, elapsed.String())
}

func TestTicketsParallel(t *testing.T) {
	t.Skip("NOT WORKING RIGHT NOW")

	done := make(chan bool)
	defer close(done)

	duration := time.Duration(0)

	start := time.Now()

	tickets := RandomTickets(maxTickets, maxDuration)
	workers := make([]<-chan *Ticket, maxWorkers)
	for worker := 0; worker < maxWorkers; worker++ {
		workers[worker] = TicketProcessor(done, tickets)
	}
	for ticket := range TicketCollector(done, workers...) {
		//t.Logf("ticket %d finished", ticket.ID)
		duration += ticket.Duration
	}

	finished := time.Now()
	elapsed := finished.Sub(start)
	t.Logf("processed %d ticket(s) in %s - ticket duration sum: %s", maxTickets, elapsed.String(), duration.String())
}

func TestTicketsComplex(t *testing.T) {

	type WorkerResult struct {
		ID, Count int
	}

	maxTickets := 200
	numWorkers := runtime.NumCPU()
	numBuffered := 50 // 0 for an unbuffered channel

	t.Logf("number of tickets: %d", maxTickets)
	t.Logf("numWorkers: %d", numWorkers)

	// generate some tickets
	totalProcessDuration := time.Duration(0)
	ticketStream := make(chan *Ticket, numBuffered)
	go func(numTickets int) {
		defer func() {
			close(ticketStream)
			t.Logf("ticketStream closed")
		}()
		t.Logf("number of tickets to generate: %d", numTickets)
		for i := 0; i < numTickets; i++ {
			//d := 200 * time.Millisecond
			d := time.Duration(100+rand.Int63n(400)) * time.Millisecond
			ticket := NewTicket(i, d)
			totalProcessDuration += d
			t.Logf("prepared ticket: %d / %s", ticket.ID, ticket.Duration.String())
			ticketStream <- ticket
		}
	}(maxTickets)

	// dispatch work to distinct goroutines
	workerResultStreamArray := make([]chan WorkerResult, numWorkers)
	for workerIndex := 0; workerIndex < numWorkers; workerIndex++ {
		workerResultStreamArray[workerIndex] = func(id int, ticketStream chan *Ticket) chan WorkerResult {
			workerResultStream := make(chan WorkerResult)
			count := 0

			go func() {
				t.Logf("worker %d started", id)
				defer func() {
					workerResultStream <- WorkerResult{ID: id, Count: count}
					close(workerResultStream)
					t.Logf("worker %d terminated", id)
				}()
				for ticket := range ticketStream {
					ticket.Process()
					count++
					t.Logf("worker %d processed ticket %d (duration %s)", id, ticket.ID, ticket.Duration.String())
				}
			}()

			return workerResultStream
		}(workerIndex, ticketStream)
	}

	// collect results from all goroutines
	workerResultStream := make(chan WorkerResult)
	go func() {
		defer close(workerResultStream)
		for _, indexedWorkerResultStream := range workerResultStreamArray {
			select {
			case workerResult := <-indexedWorkerResultStream:
				workerResultStream <- workerResult
			}
		}
	}()

	// evaluate results
	processedTickets := 0
	for i := range workerResultStream {
		t.Logf("worker %d processed %d", i.ID, i.Count)
		processedTickets += i.Count
	}
	t.Logf("processed tickets: %d - process duration: %s", processedTickets, totalProcessDuration.String())
	if maxTickets != processedTickets {
		t.Errorf("the count of processed tickets is %d, expected %d", processedTickets, maxTickets)
	}
}

workload/workload.go

0 → 100644
+111 −0
Original line number Diff line number Diff line
package workload

import (
	"sync"
)

// RepeatFn ...
func RepeatFn(done <-chan interface{}, fn func() interface{}) <-chan interface{} {
	valueStream := make(chan interface{})
	go func() {
		defer close(valueStream)
		for {
			select {
			case <-done:
				return
			case valueStream <- fn():
			}
		}
	}()
	return valueStream
}

// Take ...
func Take(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} {
	takeStream := make(chan interface{})
	go func() {
		defer close(takeStream)
		for i := 0; i < num; i++ {
			select {
			case <-done:
				return
			case takeStream <- <-valueStream:
			}
		}
	}()
	return takeStream
}

// ToInt ...
func ToInt(done <-chan interface{}, valueStream <-chan interface{}) <-chan int {
	intStream := make(chan int)
	go func() {
		defer close(intStream)
		for v := range valueStream {
			select {
			case <-done:
				return
			case intStream <- v.(int):
			}
		}
	}()
	return intStream
}

// PrimeFinder ...
func PrimeFinder(done <-chan interface{}, intStream <-chan int) <-chan interface{} {
	primeStream := make(chan interface{})
	go func() {
		defer close(primeStream)
		for integer := range intStream {
			integer--
			prime := true
			for divisor := integer - 1; divisor > 1; divisor-- {
				if integer%divisor == 0 {
					prime = false
					break
				}
			}

			if prime {
				select {
				case <-done:
					return
				case primeStream <- integer:
				}
			}
		}
	}()
	return primeStream
}

// FanIn ...
func FanIn(done <-chan interface{}, channels ...<-chan interface{}) <-chan interface{} {
	var wg sync.WaitGroup // <2>
	multiplexedStream := make(chan interface{})

	multiplex := func(c <-chan interface{}) { // <3>
		defer wg.Done()
		for i := range c {
			select {
			case <-done:
				return
			case multiplexedStream <- i:
			}
		}
	}

	// Select from all the channels
	wg.Add(len(channels)) // <4>
	for _, c := range channels {
		go multiplex(c)
	}

	// Wait for all the reads to complete
	go func() { // <5>
		wg.Wait()
		close(multiplexedStream)
	}()

	return multiplexedStream
}
Loading