Skip to content

Commit 7f89c02

Browse files
committed
Add base for thread safe go port.
1 parent 46a87be commit 7f89c02

File tree

3 files changed

+178
-12
lines changed

3 files changed

+178
-12
lines changed

source/ports/go_port/source/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
module github.com/metacall/core/source/ports/go_port/source
22

3-
go 1.14
3+
go 1.17

source/ports/go_port/source/go_port.go

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,25 @@ package metacall
77
import "C"
88

99
import (
10-
"unsafe"
1110
"errors"
11+
"unsafe"
1212
)
1313

1414
const PtrSizeInBytes = (32 << uintptr(^uintptr(0)>>63)) >> 3
1515

1616
func Initialize() error {
1717
// TODO: Remove this once go loader is implemented
18-
if (int(C.metacall_initialize()) != 0) {
18+
if int(C.metacall_initialize()) != 0 {
1919
return errors.New("MetaCall failed to initialize")
2020
}
2121

2222
return nil
2323
}
2424

25+
func InitializeSafe() error {
26+
// TODO
27+
}
28+
2529
func LoadFromFile(tag string, scripts []string) error {
2630
size := len(scripts)
2731

@@ -32,7 +36,7 @@ func LoadFromFile(tag string, scripts []string) error {
3236
defer C.free(unsafe.Pointer(cScripts))
3337

3438
// Convert cScripts to a Go Array so we can index it
35-
goScripts := (*[1 << 30 - 1] * C.char)(cScripts)
39+
goScripts := (*[1<<30 - 1]*C.char)(cScripts)
3640

3741
for index, script := range scripts {
3842
goScripts[index] = C.CString(script)
@@ -62,7 +66,7 @@ func Call(function string, args ...interface{}) (interface{}, error) {
6266
defer C.free(unsafe.Pointer(cArgs))
6367

6468
for index, arg := range args {
65-
cArg := (*unsafe.Pointer)(unsafe.Pointer(uintptr(unsafe.Pointer(cArgs)) + uintptr(index) * PtrSizeInBytes))
69+
cArg := (*unsafe.Pointer)(unsafe.Pointer(uintptr(unsafe.Pointer(cArgs)) + uintptr(index)*PtrSizeInBytes))
6670

6771
// Create int
6872
if i, ok := arg.(int); ok {
@@ -89,9 +93,9 @@ func Call(function string, args ...interface{}) (interface{}, error) {
8993
// TODO: Other types ...
9094
}
9195

92-
defer (func () {
96+
defer (func() {
9397
for index, _ := range args {
94-
cArg := (*unsafe.Pointer)(unsafe.Pointer(uintptr(unsafe.Pointer(cArgs)) + uintptr(index) * PtrSizeInBytes))
98+
cArg := (*unsafe.Pointer)(unsafe.Pointer(uintptr(unsafe.Pointer(cArgs)) + uintptr(index)*PtrSizeInBytes))
9599
C.metacall_value_destroy(*cArg)
96100
}
97101
})()
@@ -101,20 +105,24 @@ func Call(function string, args ...interface{}) (interface{}, error) {
101105
if ret != nil {
102106
defer C.metacall_value_destroy(ret)
103107

104-
switch (C.metacall_value_id(unsafe.Pointer(ret))) {
105-
case C.METACALL_INT: {
108+
switch C.metacall_value_id(unsafe.Pointer(ret)) {
109+
case C.METACALL_INT:
110+
{
106111
return int(C.metacall_value_to_int(unsafe.Pointer(ret))), nil
107112
}
108113

109-
case C.METACALL_FLOAT: {
114+
case C.METACALL_FLOAT:
115+
{
110116
return float32(C.metacall_value_to_float(unsafe.Pointer(ret))), nil
111117
}
112118

113-
case C.METACALL_DOUBLE: {
119+
case C.METACALL_DOUBLE:
120+
{
114121
return float64(C.metacall_value_to_double(unsafe.Pointer(ret))), nil
115122
}
116123

117-
case C.METACALL_STRING: {
124+
case C.METACALL_STRING:
125+
{
118126
return C.GoString(C.metacall_value_to_string(unsafe.Pointer(ret))), nil
119127
}
120128

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
package metacall
2+
3+
import (
4+
"sync/atomic"
5+
"unsafe"
6+
)
7+
8+
type node struct {
9+
value interface{}
10+
next unsafe.Pointer
11+
}
12+
13+
type queue struct {
14+
head unsafe.Pointer
15+
tail unsafe.Pointer
16+
}
17+
18+
func load(p *unsafe.Pointer) *node {
19+
return (*node)(atomic.LoadPointer(p))
20+
}
21+
22+
func store(p *unsafe.Pointer, n *node) {
23+
atomic.StorePointer(p, unsafe.Pointer(n))
24+
}
25+
26+
func cas(p *unsafe.Pointer, old, new *node) bool {
27+
return atomic.CompareAndSwapPointer(
28+
p, unsafe.Pointer(old), unsafe.Pointer(new))
29+
}
30+
31+
func newQueue() (q *queue) {
32+
n := unsafe.Pointer(&node{})
33+
q = &queue{head: n, tail: n}
34+
35+
return q
36+
}
37+
38+
func (q *queue) enqueue(v interface{}) {
39+
n := &node{value: v}
40+
for {
41+
last := load(&q.tail)
42+
next := load(&last.next)
43+
if last == load(&q.tail) {
44+
if next == nil {
45+
if cas(&last.next, next, n) {
46+
cas(&q.tail, last, n)
47+
return
48+
}
49+
} else {
50+
cas(&q.tail, last, next)
51+
}
52+
}
53+
}
54+
}
55+
56+
func (q *queue) dequeue() (interface{}, bool) {
57+
first := load(&q.head)
58+
last := load(&q.tail)
59+
next := load(&first.next)
60+
if first == load(&q.head) {
61+
if first == last {
62+
if next == nil {
63+
return nil, false
64+
}
65+
cas(&q.tail, last, next)
66+
} else {
67+
v := next.value
68+
if cas(&q.head, first, next) {
69+
return v, true
70+
}
71+
}
72+
}
73+
74+
return nil, false
75+
}
76+
77+
func (q *queue) iterator() <-chan interface{} {
78+
c := make(chan interface{})
79+
go func(q *queue, c chan<- interface{}) {
80+
for {
81+
v, ok := q.dequeue()
82+
if !ok {
83+
break
84+
}
85+
86+
c <- v
87+
}
88+
close(c)
89+
}(q, c)
90+
return c
91+
}
92+
93+
type Queue struct {
94+
impl *queue
95+
watchers *queue
96+
quit chan int
97+
}
98+
99+
func NewQueue() *Queue {
100+
return &Queue{
101+
impl: newQueue(),
102+
watchers: newQueue(),
103+
quit: make(chan int),
104+
}
105+
}
106+
107+
func (q *Queue) Enqueue(v interface{}) {
108+
q.impl.enqueue(v)
109+
110+
for notify := range q.watchers.iterator() {
111+
c := notify.(chan int)
112+
113+
go func() {
114+
c <- 1
115+
}()
116+
}
117+
}
118+
119+
func (q *Queue) Dequeue() (interface{}, bool) {
120+
return q.impl.dequeue()
121+
}
122+
123+
func (q *Queue) Iterator() <-chan interface{} {
124+
c := make(chan interface{})
125+
go func(q *Queue, c chan<- interface{}) {
126+
for {
127+
v, ok := q.Dequeue()
128+
129+
if !ok {
130+
notify := make(chan int)
131+
q.watchers.enqueue(notify)
132+
133+
select {
134+
case <-notify:
135+
continue
136+
case <-q.quit:
137+
go func() {
138+
<-notify
139+
}()
140+
goto end
141+
}
142+
143+
} else {
144+
c <- v
145+
}
146+
}
147+
148+
end:
149+
close(c)
150+
}(q, c)
151+
return c
152+
}
153+
154+
func (q *Queue) Close() {
155+
go func() {
156+
q.quit <- 1
157+
}()
158+
}

0 commit comments

Comments
 (0)