Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions aio_generic.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package gaio

import (
"container/list"
"errors"
"net"
"time"
Expand Down Expand Up @@ -110,8 +109,7 @@ type OpResult struct {

// aiocb contains all info for a single request
type aiocb struct {
l *list.List // list where this request belongs to
elem *list.Element
l *cbList // list where this request belongs to
ctx interface{} // user context associated with this request
ptr uintptr // pointer to conn
op OpType // read or write
Expand Down
20 changes: 20 additions & 0 deletions cblist.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package gaio

type cbList []*aiocb

func (l *cbList) PushBack(cb *aiocb) {
*l = append(*l, cb)
}

func (l *cbList) Remove(cb *aiocb) {
for idx, v := range *l {
if v == cb {
*l = append((*l)[:idx], (*l)[idx+1:]...)
return
}
}
}

func (l *cbList) RemoveHeadN(n int) {
*l = append((*l)[:0], (*l)[n:]...)
}
43 changes: 19 additions & 24 deletions watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package gaio

import (
"container/heap"
"container/list"
"io"
"net"
"reflect"
Expand All @@ -31,8 +30,8 @@ func init() {

// fdDesc contains all data structures associated to fd
type fdDesc struct {
readers list.List // all read/write requests
writers list.List
readers cbList // all read/write requests
writers cbList
ptr uintptr // pointer to net.Conn
r_armed bool
w_armed bool
Expand Down Expand Up @@ -385,15 +384,13 @@ func (w *watcher) tryWrite(fd int, pcb *aiocb) bool {
func (w *watcher) releaseConn(ident int) {
if desc, ok := w.descs[ident]; ok {
// delete from heap
for e := desc.readers.Front(); e != nil; e = e.Next() {
tcb := e.Value.(*aiocb)
for _, tcb := range desc.readers {
// notify caller
tcb.err = io.ErrClosedPipe
w.deliver(tcb)
}

for e := desc.writers.Front(); e != nil; e = e.Next() {
tcb := e.Value.(*aiocb)
for _, tcb := range desc.writers {
tcb.err = io.ErrClosedPipe
w.deliver(tcb)
}
Expand Down Expand Up @@ -447,7 +444,7 @@ func (w *watcher) loop() {
// ErrDeadline
pcb.err = ErrDeadline
// remove from list
pcb.l.Remove(pcb.elem)
pcb.l.Remove(pcb)
w.deliver(pcb)
} else {
w.timer.Reset(pcb.deadline.Sub(now))
Expand Down Expand Up @@ -538,29 +535,29 @@ func (w *watcher) handlePending(pending []*aiocb) {
switch pcb.op {
case OpRead:
// try immediately queue is empty
if desc.readers.Len() == 0 {
if len(desc.readers) == 0 {
if w.tryRead(ident, pcb) {
w.deliver(pcb)
continue
}
}
// enqueue for poller events
pcb.l = &desc.readers
pcb.elem = pcb.l.PushBack(pcb)
pcb.l.PushBack(pcb)

if !desc.r_armed {
desc.r_armed = true
}
case OpWrite:
if desc.writers.Len() == 0 {
if len(desc.writers) == 0 {
if w.tryWrite(ident, pcb) {
w.deliver(pcb)
continue
}
}

pcb.l = &desc.writers
pcb.elem = pcb.l.PushBack(pcb)
pcb.l.PushBack(pcb)

if !desc.w_armed {
desc.w_armed = true
Expand Down Expand Up @@ -595,38 +592,36 @@ func (w *watcher) handleEvents(pe pollerEvents) {
if desc, ok := w.descs[e.ident]; ok {
if e.ev&EV_READ != 0 {
desc.r_armed = false
var next *list.Element
for elem := desc.readers.Front(); elem != nil; elem = next {
next = elem.Next()
pcb := elem.Value.(*aiocb)
count := 0
for _, pcb := range desc.readers {
if w.tryRead(e.ident, pcb) {
w.deliver(pcb)
desc.readers.Remove(elem)
count++
} else {
break
}
}
desc.readers.RemoveHeadN(count)

if desc.readers.Len() > 0 {
if len(desc.readers) > 0 {
desc.r_armed = true
}
}

if e.ev&EV_WRITE != 0 {
desc.w_armed = false
var next *list.Element
for elem := desc.writers.Front(); elem != nil; elem = next {
next = elem.Next()
pcb := elem.Value.(*aiocb)
count := 0
for _, pcb := range desc.writers {
if w.tryWrite(e.ident, pcb) {
w.deliver(pcb)
desc.writers.Remove(elem)
count++
} else {
break
}
}
desc.writers.RemoveHeadN(count)

if desc.writers.Len() > 0 {
if len(desc.writers) > 0 {
desc.w_armed = true
}
}
Expand Down