Commit 67eef8b8 authored by Boris Mühmer's avatar Boris Mühmer
Browse files

added (modified) examples from "Concurrency in Go"

parent e2ffe848
Loading
Loading
Loading
Loading
+110 −0
Original line number Diff line number Diff line
package workload

import (
	"sync"
)

// RepeatFn ...
func RepeatFn(done <-chan interface{}, fn func() interface{}) <-chan interface{} {
	valueStream := make(chan interface{})
	go func() {
		defer close(valueStream)
		for {
			select {
			case <-done:
				return
			case valueStream <- fn():
			}
		}
	}()
	return valueStream
}

// Take ...
func Take(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} {
	takeStream := make(chan interface{})
	go func() {
		defer close(takeStream)
		for i := 0; i < num; i++ {
			select {
			case <-done:
				return
			case takeStream <- <-valueStream:
			}
		}
	}()
	return takeStream
}

// ToInt ...
func ToInt(done <-chan interface{}, valueStream <-chan interface{}) <-chan int {
	intStream := make(chan int)
	go func() {
		defer close(intStream)
		for v := range valueStream {
			select {
			case <-done:
				return
			case intStream <- v.(int):
			}
		}
	}()
	return intStream
}

// PrimeFinder ...
func PrimeFinder(done <-chan interface{}, intStream <-chan int) <-chan interface{} {
	primeStream := make(chan interface{})
	go func() {
		defer close(primeStream)
		for integer := range intStream {
			integer--
			prime := true
			for divisor := integer - 1; divisor > 1; divisor-- {
				if integer%divisor == 0 {
					prime = false
					break
				}
			}

			if prime {
				select {
				case <-done:
					return
				case primeStream <- integer:
				}
			}
		}
	}()
	return primeStream
}

// FanIn ...
func FanIn(done <-chan interface{}, channels ...<-chan interface{}) <-chan interface{} {
	var wg sync.WaitGroup // <2>
	multiplexedStream := make(chan interface{})

	multiplex := func(c <-chan interface{}) { // <3>
		defer wg.Done()
		for i := range c {
			select {
			case <-done:
				return
			case multiplexedStream <- i:
			}
		}
	}

	// Select from all the channels
	wg.Add(len(channels)) // <4>
	for _, c := range channels {
		go multiplex(c)
	}

	// Wait for all the reads to complete
	go func() { // <5>
		wg.Wait()
		close(multiplexedStream)
	}()

	return multiplexedStream
}
+117 −2
Original line number Diff line number Diff line
package workload

import "testing"
import (
	"math/rand"
	"runtime"
	"sync"
	"testing"
	"time"
)

func TestWorkload(t *testing.T) {
const maxSteps = 3
const maxSleepUnits = 1000
const maxWorker = 8

func randSeedTimeNow() {
	rand.Seed(time.Now().UnixNano())
}

func workerProcess(t *testing.T, id int) {
	t.Logf("worker %d started", id)
	defer t.Logf("worker %d terminated", id)

	steps := 3 + rand.Intn(maxSteps)
	for step := 0; step < steps; step++ {
		t.Logf("worker %d at step %d: started", id, step)
		sleep := rand.Intn(maxSleepUnits)
		time.Sleep(time.Duration(sleep) * time.Microsecond)
		t.Logf("worker %d at step %d: finished", id, step)
	}
}

func TestWorkloadWaitGroup(t *testing.T) {
	randSeedTimeNow()

	var wg sync.WaitGroup
	for worker := 0; worker < maxWorker; worker++ {
		wg.Add(1)
		go func(w int) {
			defer wg.Done()
			workerProcess(t, w)
		}(worker)
	}
	wg.Wait()
}

func TestWorkloadChannels(t *testing.T) {
	randSeedTimeNow()

	result := make(chan int)

	//numFinders := runtime.NumCPU()

	for worker := 0; worker < maxWorker; worker++ {
		go func(w int) {
			defer func() { result <- w }()
			workerProcess(t, w)
		}(worker)
	}

	for {
		select {
		case r := <-result:
			t.Logf("r: %d", r)
		}
	}
}

const (
	numberOfPrimes = 5 //10
)

func TestPrimes(t *testing.T) {
	repeatFn := RepeatFn
	take := Take
	toInt := ToInt
	primeFinder := PrimeFinder
	rand := func() interface{} { return rand.Intn(50000000) }

	done := make(chan interface{})
	defer close(done)

	start := time.Now()

	randIntStream := toInt(done, repeatFn(done, rand))
	t.Logf("Primes:\n")
	for prime := range take(done, primeFinder(done, randIntStream), numberOfPrimes) {
		t.Logf("\t%d\n", prime)
	}

	t.Logf("Search took: %v", time.Since(start))
}

func TestPrimesDispatched(t *testing.T) {
	repeatFn := RepeatFn
	take := Take
	toInt := ToInt
	primeFinder := PrimeFinder
	fanIn := FanIn

	done := make(chan interface{})
	defer close(done)

	start := time.Now()

	rand := func() interface{} { return rand.Intn(50000000) }

	randIntStream := toInt(done, repeatFn(done, rand))

	numFinders := runtime.NumCPU()
	t.Logf("Spinning up %d prime finders.\n", numFinders)
	finders := make([]<-chan interface{}, numFinders)
	t.Logf("Primes:\n")
	for i := 0; i < numFinders; i++ {
		finders[i] = primeFinder(done, randIntStream)
	}

	for prime := range take(done, fanIn(done, finders...), numberOfPrimes) {
		t.Logf("\t%d\n", prime)
	}

	t.Logf("Search took: %v", time.Since(start))
}