Loading get.go +42 −58 Original line number Diff line number Diff line Loading @@ -21,10 +21,7 @@ type Expect struct { stdin io.Reader stdout io.Writer stderr io.Writer matchStream chan struct { c Channel m Match } matchStream chan ChannelMatch terminateStream chan bool finishedStream chan bool } Loading @@ -34,12 +31,13 @@ type Match struct { v string } func New() (*Expect, error) { x := &Expect{ matchStream: make(chan struct { type ChannelMatch struct { c Channel m Match }), } func New() (*Expect, error) { x := &Expect{ terminateStream: make(chan bool), finishedStream: make(chan bool), } Loading Loading @@ -77,10 +75,7 @@ func (x *Expect) Stderr() io.Writer { } func (x *Expect) Wait(c Channel, match, text string) error { x.matchStream <- struct { c Channel m Match }{ x.matchStream <- ChannelMatch{ c: c, m: Match{p: match, v: text}, } Loading @@ -88,8 +83,6 @@ func (x *Expect) Wait(c Channel, match, text string) error { } func (x *Expect) worker(started chan<- bool) { var wg sync.WaitGroup stdinReader, stdinWriter := io.Pipe() stdout := &mb.MutexBuffer{} stderr := &mb.MutexBuffer{} Loading @@ -98,6 +91,8 @@ func (x *Expect) worker(started chan<- bool) { x.stdout = stdout x.stderr = stderr x.matchStream = make(chan ChannelMatch) matchOutStream := make(chan Match) startedOutStream := make(chan bool) terminateOutStream := make(chan bool) Loading @@ -115,6 +110,7 @@ func (x *Expect) worker(started chan<- bool) { checkStream := make(chan bool) defer close(checkStream) var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() Loading Loading @@ -143,13 +139,12 @@ func (x *Expect) worker(started chan<- bool) { for { select { case match, valid := <-x.matchStream: if !valid { switch { case !valid: continue } switch match.c { case Out: case match.c == Out: matchOutStream <- match.m case Err: case match.c == Err: matchErrStream <- match.m } case s, valid := <-stringStream: Loading @@ -158,50 +153,40 @@ func (x *Expect) worker(started chan<- bool) { } fmt.Fprintln(stdinWriter, s) case terminate, valid := <-x.terminateStream: if !valid { continue } if !terminate { switch { case !valid || !terminate: continue } default: async(terminateOutStream, true) async(terminateErrStream, true) case terminated, valid := <-terminatedOutStream: if !valid { continue } if !terminated { case terminated, valid := <-terminatedOutStream: switch { case !valid || !terminated: continue } default: terminatedOut = true async(checkStream, true) case terminated, valid := <-terminatedErrStream: if !valid { continue } if !terminated { case terminated, valid := <-terminatedErrStream: switch { case !valid || !terminated: continue } default: terminatedErr = true async(checkStream, true) case check, valid := <-checkStream: if !valid { continue } if !check { continue } if !terminatedOut { continue } if !terminatedErr { case check, valid := <-checkStream: switch { case !valid || !check || !terminatedOut || !terminatedErr: continue } default: wg.Wait() x.finishedStream <- true } } } } func process( in io.Reader, Loading @@ -211,6 +196,7 @@ func process( terminateStream <-chan bool, terminatedStream chan<- bool, ) { defer async(terminatedStream, true) matches := []Match{} startedStream <- true Loading @@ -223,14 +209,12 @@ func process( } matches = append(matches, match) case terminate, valid := <-terminateStream: if !valid { continue } if !terminate { switch { case !valid || !terminate: continue } terminatedStream <- true default: return } default: bytes, err := io.ReadAll(in) if err != nil { Loading Loading
get.go +42 −58 Original line number Diff line number Diff line Loading @@ -21,10 +21,7 @@ type Expect struct { stdin io.Reader stdout io.Writer stderr io.Writer matchStream chan struct { c Channel m Match } matchStream chan ChannelMatch terminateStream chan bool finishedStream chan bool } Loading @@ -34,12 +31,13 @@ type Match struct { v string } func New() (*Expect, error) { x := &Expect{ matchStream: make(chan struct { type ChannelMatch struct { c Channel m Match }), } func New() (*Expect, error) { x := &Expect{ terminateStream: make(chan bool), finishedStream: make(chan bool), } Loading Loading @@ -77,10 +75,7 @@ func (x *Expect) Stderr() io.Writer { } func (x *Expect) Wait(c Channel, match, text string) error { x.matchStream <- struct { c Channel m Match }{ x.matchStream <- ChannelMatch{ c: c, m: Match{p: match, v: text}, } Loading @@ -88,8 +83,6 @@ func (x *Expect) Wait(c Channel, match, text string) error { } func (x *Expect) worker(started chan<- bool) { var wg sync.WaitGroup stdinReader, stdinWriter := io.Pipe() stdout := &mb.MutexBuffer{} stderr := &mb.MutexBuffer{} Loading @@ -98,6 +91,8 @@ func (x *Expect) worker(started chan<- bool) { x.stdout = stdout x.stderr = stderr x.matchStream = make(chan ChannelMatch) matchOutStream := make(chan Match) startedOutStream := make(chan bool) terminateOutStream := make(chan bool) Loading @@ -115,6 +110,7 @@ func (x *Expect) worker(started chan<- bool) { checkStream := make(chan bool) defer close(checkStream) var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() Loading Loading @@ -143,13 +139,12 @@ func (x *Expect) worker(started chan<- bool) { for { select { case match, valid := <-x.matchStream: if !valid { switch { case !valid: continue } switch match.c { case Out: case match.c == Out: matchOutStream <- match.m case Err: case match.c == Err: matchErrStream <- match.m } case s, valid := <-stringStream: Loading @@ -158,50 +153,40 @@ func (x *Expect) worker(started chan<- bool) { } fmt.Fprintln(stdinWriter, s) case terminate, valid := <-x.terminateStream: if !valid { continue } if !terminate { switch { case !valid || !terminate: continue } default: async(terminateOutStream, true) async(terminateErrStream, true) case terminated, valid := <-terminatedOutStream: if !valid { continue } if !terminated { case terminated, valid := <-terminatedOutStream: switch { case !valid || !terminated: continue } default: terminatedOut = true async(checkStream, true) case terminated, valid := <-terminatedErrStream: if !valid { continue } if !terminated { case terminated, valid := <-terminatedErrStream: switch { case !valid || !terminated: continue } default: terminatedErr = true async(checkStream, true) case check, valid := <-checkStream: if !valid { continue } if !check { continue } if !terminatedOut { continue } if !terminatedErr { case check, valid := <-checkStream: switch { case !valid || !check || !terminatedOut || !terminatedErr: continue } default: wg.Wait() x.finishedStream <- true } } } } func process( in io.Reader, Loading @@ -211,6 +196,7 @@ func process( terminateStream <-chan bool, terminatedStream chan<- bool, ) { defer async(terminatedStream, true) matches := []Match{} startedStream <- true Loading @@ -223,14 +209,12 @@ func process( } matches = append(matches, match) case terminate, valid := <-terminateStream: if !valid { continue } if !terminate { switch { case !valid || !terminate: continue } terminatedStream <- true default: return } default: bytes, err := io.ReadAll(in) if err != nil { Loading