Commit 2f5d1ab6 authored by Boris Mühmer's avatar Boris Mühmer
Browse files

another try with mutex buffer

parent f20355d8
Loading
Loading
Loading
Loading
+41 −12
Original line number Diff line number Diff line
package get

import (
	"bytes"
	"io"
	"strings"
	"sync"

	"repositories.muehmer.net/bsmrgo/get/mb"
)

type Channel uint
@@ -16,9 +17,11 @@ const (
)

type Expect struct {
	stdin       bytes.Buffer
	stdout      bytes.Buffer
	stderr      bytes.Buffer
	// bytes.Buffer
	// mb.MutexBuffer
	stdin       *mb.MutexBuffer
	stdout      *mb.MutexBuffer
	stderr      *mb.MutexBuffer
	matchStream chan struct {
		c Channel
		m Match
@@ -63,15 +66,15 @@ func (x *Expect) Close() error {
}

func (x *Expect) Stdin() io.Reader {
	return &x.stdin
	return x.stdin
}

func (x *Expect) Stdout() io.Writer {
	return &x.stdout
	return x.stdout
}

func (x *Expect) Stderr() io.Writer {
	return &x.stderr
	return x.stderr
}

func (x *Expect) Wait(c Channel, match, text string) error {
@@ -88,6 +91,10 @@ func (x *Expect) Wait(c Channel, match, text string) error {
func (x *Expect) worker(started chan<- bool) {
	var wg sync.WaitGroup

	x.stdin = &mb.MutexBuffer{}
	x.stdout = &mb.MutexBuffer{}
	x.stderr = &mb.MutexBuffer{}

	matchOutStream := make(chan Match)
	startedOutStream := make(chan bool)
	terminateOutStream := make(chan bool)
@@ -100,19 +107,29 @@ func (x *Expect) worker(started chan<- bool) {
	terminatedErrStream := make(chan bool)
	terminatedErr := false

	stringStream := make(chan string)
	defer close(stringStream)
	checkStream := make(chan bool)
	defer close(checkStream)

	wg.Add(2)
	go func() {
		defer wg.Done()
		process(&x.stdout, &x.stdin,
			startedOutStream, matchOutStream, terminateOutStream, terminatedOutStream)
		process(
			x.stdout, x.stdin,
			startedOutStream,
			matchOutStream,
			stringStream,
			terminateOutStream, terminatedOutStream)
	}()
	go func() {
		defer wg.Done()
		process(&x.stderr, &x.stdin,
			startedErrStream, matchErrStream, terminateErrStream, terminatedErrStream)
		process(
			x.stderr, x.stdin,
			startedErrStream,
			matchErrStream,
			stringStream,
			terminateErrStream, terminatedErrStream)
	}()

	<-startedOutStream
@@ -132,6 +149,14 @@ func (x *Expect) worker(started chan<- bool) {
			case Err:
				matchErrStream <- match.m
			}
		case s, valid := <-stringStream:
			if !valid {
				continue
			}
			//x.stdin.Reset()
			//x.stdin.Write([]byte(s))
			io.WriteString(x.stdin, s)
			//fmt.Fprintln(x.stdin, s)
		case terminate, valid := <-x.terminateStream:
			if !valid {
				continue
@@ -183,6 +208,7 @@ func process(
	out io.Writer,
	startedStream chan<- bool,
	matchStream <-chan Match,
	stringStream chan<- string,
	terminateStream <-chan bool,
	terminatedStream chan<- bool,
) {
@@ -219,7 +245,10 @@ func process(
				if !strings.Contains(s, m.p) {
					continue
				}
				io.WriteString(out, m.v)
				//io.WriteString(out, m.v)
				go func(s string) {
					stringStream <- s
				}(m.v)
				break
			}
		}

mb/mb.go

0 → 100644
+77 −0
Original line number Diff line number Diff line
package mb

import (
	"bytes"
	"sync"
)

type MutexBuffer struct {
	buffer bytes.Buffer
	mutex  sync.Mutex
}

func (mb *MutexBuffer) Read(p []byte) (n int, err error) {
	mb.mutex.Lock()
	defer mb.mutex.Unlock()
	return mb.buffer.Read(p)
}

func (mb *MutexBuffer) Write(p []byte) (n int, err error) {
	mb.mutex.Lock()
	defer mb.mutex.Unlock()
	return mb.buffer.Write(p)
}

func (mb *MutexBuffer) String() string {
	mb.mutex.Lock()
	defer mb.mutex.Unlock()
	return mb.buffer.String()
}

func (mb *MutexBuffer) Reset() {
	mb.mutex.Lock()
	defer mb.mutex.Unlock()
	mb.buffer.Reset()
}

func (mb *MutexBuffer) Truncate(n int) {
	mb.mutex.Lock()
	defer mb.mutex.Unlock()
	mb.buffer.Truncate(n)
}

func (mb *MutexBuffer) Bytes() []byte {
	mb.mutex.Lock()
	defer mb.mutex.Unlock()
	return mb.buffer.Bytes()
}
func (b *MutexBuffer) Cap() int {
	b.mutex.Lock()
	defer b.mutex.Unlock()
	return b.buffer.Cap()
}
func (mb *MutexBuffer) Grow(n int) {
	mb.mutex.Lock()
	defer mb.mutex.Unlock()
	mb.buffer.Grow(n)
}
func (mb *MutexBuffer) Len() int {
	mb.mutex.Lock()
	defer mb.mutex.Unlock()
	return mb.buffer.Len()
}
func (mb *MutexBuffer) Next(n int) []byte {
	mb.mutex.Lock()
	defer mb.mutex.Unlock()
	return mb.buffer.Next(n)
}
func (mb *MutexBuffer) ReadByte() (c byte, err error) {
	mb.mutex.Lock()
	defer mb.mutex.Unlock()
	return mb.buffer.ReadByte()
}
func (mb *MutexBuffer) ReadBytes(delim byte) (line []byte, err error) {
	mb.mutex.Lock()
	defer mb.mutex.Unlock()
	return mb.buffer.ReadBytes(delim)
}

mb/mb_test.go

0 → 100644
+7 −0
Original line number Diff line number Diff line
package mb

import "testing"

func TestSkip(t *testing.T) {
	t.Skip("not implemented")
}