Skip to content

Commit 1139656

Browse files
committed
feat: added generics sync
1 parent 9c25444 commit 1139656

File tree

7 files changed

+747
-24
lines changed

7 files changed

+747
-24
lines changed

go.mod

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,17 @@ go 1.24.2
55
tool mvdan.cc/gofumpt
66

77
require (
8-
github.com/go-json-experiment/json v0.0.0-20250223041408-d3c622f1b874
8+
github.com/go-json-experiment/json v0.0.0-20250626171732-1a886bd29d1b
99
github.com/onsi/gomega v1.37.0
10-
golang.org/x/sync v0.13.0
11-
golang.org/x/text v0.24.0
12-
golang.org/x/tools v0.32.0
10+
golang.org/x/sync v0.15.0
11+
golang.org/x/text v0.26.0
12+
golang.org/x/tools v0.34.0
1313
)
1414

1515
require (
1616
github.com/google/go-cmp v0.7.0 // indirect
17-
golang.org/x/mod v0.24.0 // indirect
18-
golang.org/x/net v0.39.0 // indirect
17+
golang.org/x/mod v0.25.0 // indirect
18+
golang.org/x/net v0.41.0 // indirect
1919
gopkg.in/yaml.v3 v3.0.1 // indirect
2020
mvdan.cc/gofumpt v0.7.0 // indirect
2121
)

go.sum

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
github.com/go-json-experiment/json v0.0.0-20250223041408-d3c622f1b874 h1:F8d1AJ6M9UQCavhwmO6ZsrYLfG8zVFWfEfMS2MXPkSY=
2-
github.com/go-json-experiment/json v0.0.0-20250223041408-d3c622f1b874/go.mod h1:TiCD2a1pcmjd7YnhGH0f/zKNcCD06B029pHhzV23c2M=
1+
github.com/go-json-experiment/json v0.0.0-20250626171732-1a886bd29d1b h1:ooF9/NzXkXL3OOLRwtPuQT/D7Kx2S5w/Kl1GnMF9h2s=
2+
github.com/go-json-experiment/json v0.0.0-20250626171732-1a886bd29d1b/go.mod h1:TiCD2a1pcmjd7YnhGH0f/zKNcCD06B029pHhzV23c2M=
33
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
44
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
55
github.com/go-quicktest/qt v1.101.0 h1:O1K29Txy5P2OK0dGo59b7b0LR6wKfIhttaAhHUyn7eI=
@@ -20,18 +20,18 @@ github.com/onsi/gomega v1.37.0 h1:CdEG8g0S133B4OswTDC/5XPSzE1OeP29QOioj2PID2Y=
2020
github.com/onsi/gomega v1.37.0/go.mod h1:8D9+Txp43QWKhM24yyOBEdpkzN8FvJyAwecBgsU4KU0=
2121
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
2222
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
23-
golang.org/x/mod v0.24.0 h1:ZfthKaKaT4NrhGVZHO1/WDTwGES4De8KtWO0SIbNJMU=
24-
golang.org/x/mod v0.24.0/go.mod h1:IXM97Txy2VM4PJ3gI61r1YEk/gAj6zAHN3AdZt6S9Ww=
25-
golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY=
26-
golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E=
27-
golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610=
28-
golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
29-
golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
30-
golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
31-
golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
32-
golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
33-
golang.org/x/tools v0.32.0 h1:Q7N1vhpkQv7ybVzLFtTjvQya2ewbwNDZzUgfXGqtMWU=
34-
golang.org/x/tools v0.32.0/go.mod h1:ZxrU41P/wAbZD8EDa6dDCa6XfpkhJ7HFMjHJXfBDu8s=
23+
golang.org/x/mod v0.25.0 h1:n7a+ZbQKQA/Ysbyb0/6IbB1H/X41mKgbhfv7AfG/44w=
24+
golang.org/x/mod v0.25.0/go.mod h1:IXM97Txy2VM4PJ3gI61r1YEk/gAj6zAHN3AdZt6S9Ww=
25+
golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw=
26+
golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA=
27+
golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8=
28+
golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
29+
golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw=
30+
golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
31+
golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M=
32+
golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA=
33+
golang.org/x/tools v0.34.0 h1:qIpSLOxeCYGg9TrcJokLBG4KFA6d795g0xkBkiESGlo=
34+
golang.org/x/tools v0.34.0/go.mod h1:pAP9OwEaY1CAW3HOmg3hLZC5Z0CCmzjAF2UQMSqNARg=
3535
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
3636
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
3737
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=

justfile

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@ fmt:
22
go tool gofumpt -w -l .
33

44
test:
5-
CGO_ENABLED=0 go test -v -failfast ./...
5+
CGO_ENABLED=0 go test -count=1 -failfast ./...
66

77
test-bench:
8-
CGO_ENABLED=0 go test -test.bench=. -test.benchmem ./...
8+
CGO_ENABLED=0 go test -count=1 -test.bench=. -test.benchmem ./...
99

10-
test-cover:
11-
CGO_ENABLED=1 go test -v -race -coverprofile=coverage.txt -covermode=atomic ./...
10+
test-race:
11+
CGO_ENABLED=1 go test -count=1 -race ./...
1212

1313
dep:
1414
go mod tidy

sync/map.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package sync
2+
3+
import "sync"
4+
5+
type Map[K comparable, V any] struct {
6+
m sync.Map
7+
}
8+
9+
func (m *Map[K, V]) typeAssertion(v any, result bool) (V, bool) {
10+
if vv, ok := v.(V); ok {
11+
return vv, result
12+
}
13+
return *new(V), result
14+
}
15+
16+
func (m *Map[K, V]) LoadOrStore(k K, value V) (V, bool) {
17+
return m.typeAssertion(m.m.LoadOrStore(k, value))
18+
}
19+
20+
func (m *Map[K, V]) Delete(k K) {
21+
m.m.Delete(k)
22+
}
23+
24+
func (m *Map[K, V]) Load(k K) (V, bool) {
25+
return m.typeAssertion(m.m.Load(k))
26+
}
27+
28+
func (m *Map[K, V]) LoadAndDelete(k K) (V, bool) {
29+
return m.typeAssertion(m.m.LoadAndDelete(k))
30+
}
31+
32+
func (m *Map[K, V]) Swap(k K, value V) (V, bool) {
33+
return m.typeAssertion(m.m.Swap(k, value))
34+
}
35+
36+
func (m *Map[K, V]) CompareAndSwap(k K, old V, new V) bool {
37+
return m.m.CompareAndSwap(k, old, new)
38+
}
39+
40+
func (m *Map[K, V]) CompareAndDelete(k K, old V) bool {
41+
return m.m.CompareAndDelete(k, old)
42+
}
43+
44+
func (m *Map[K, V]) Store(k K, v V) {
45+
m.m.Store(k, v)
46+
}
47+
48+
func (m *Map[K, V]) Clear() {
49+
m.m.Clear()
50+
}
51+
52+
func (m *Map[K, V]) Range(r func(k K, v V) bool) {
53+
m.m.Range(func(key, value any) bool {
54+
return r(key.(K), value.(V))
55+
})
56+
}

sync/pool.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package sync
2+
3+
import "sync"
4+
5+
type Pool[V any] struct {
6+
pool sync.Pool
7+
New func() V
8+
}
9+
10+
func (p *Pool[V]) Put(x V) {
11+
p.pool.Put(x)
12+
}
13+
14+
func (p *Pool[V]) Get() V {
15+
return p.pool.Get().(V)
16+
}

sync/singleflight/singleflight.go

Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
// Copyright 2013 The Go Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
// Package singleflight provides a duplicate function call suppression
6+
// mechanism.
7+
package singleflight // import "github.com/catatsuy/sync/singleflight"
8+
9+
import (
10+
"bytes"
11+
"errors"
12+
"fmt"
13+
"runtime"
14+
"runtime/debug"
15+
"sync"
16+
)
17+
18+
// errGoexit indicates the runtime.Goexit was called in
19+
// the user given function.
20+
var errGoexit = errors.New("runtime.Goexit was called")
21+
22+
// A panicError is an arbitrary value recovered from a panic
23+
// with the stack trace during the execution of given function.
24+
type panicError struct {
25+
value any
26+
stack []byte
27+
}
28+
29+
// Error implements error interface.
30+
func (p *panicError) Error() string {
31+
return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
32+
}
33+
34+
func (p *panicError) Unwrap() error {
35+
err, ok := p.value.(error)
36+
if !ok {
37+
return nil
38+
}
39+
40+
return err
41+
}
42+
43+
func newPanicError(v any) error {
44+
stack := debug.Stack()
45+
46+
// The first line of the stack trace is of the form "goroutine N [status]:"
47+
// but by the time the panic reaches Do the goroutine may no longer exist
48+
// and its status will have changed. Trim out the misleading line.
49+
if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {
50+
stack = stack[line+1:]
51+
}
52+
return &panicError{value: v, stack: stack}
53+
}
54+
55+
// call is an in-flight or completed singleflight.Do call
56+
type call[V any] struct {
57+
wg sync.WaitGroup
58+
59+
// These fields are written once before the WaitGroup is done
60+
// and are only read after the WaitGroup is done.
61+
val V
62+
err error
63+
64+
// These fields are read and written with the singleflight
65+
// mutex held before the WaitGroup is done, and are read but
66+
// not written after the WaitGroup is done.
67+
dups int
68+
chans []chan<- Result[V]
69+
}
70+
71+
type Group[K comparable] struct {
72+
g Group2[K, struct{}]
73+
}
74+
75+
func (g *Group[K]) Do(key K, fn func() error) (error, bool) {
76+
_, err, shared := g.g.Do(key, func() (struct{}, error) {
77+
return struct{}{}, fn()
78+
})
79+
return err, shared
80+
}
81+
82+
func (g *Group[K]) Forget(key K) {
83+
g.g.Forget(key)
84+
}
85+
86+
// Group2 represents a class of work and forms a namespace in
87+
// which units of work can be executed with duplicate suppression.
88+
type Group2[K comparable, V any] struct {
89+
mu sync.Mutex // protects m
90+
m map[K]*call[V] // lazily initialized
91+
}
92+
93+
// Result holds the results of Do, so they can be passed
94+
// on a channel.
95+
type Result[V any] struct {
96+
Val V
97+
Err error
98+
Shared bool
99+
}
100+
101+
// Do executes and returns the results of the given function, making
102+
// sure that only one execution is in-flight for a given key at a
103+
// time. If a duplicate comes in, the duplicate caller waits for the
104+
// original to complete and receives the same results.
105+
// The return value shared indicates whether v was given to multiple callers.
106+
func (g *Group2[K, V]) Do(key K, fn func() (V, error)) (v V, err error, shared bool) {
107+
g.mu.Lock()
108+
if g.m == nil {
109+
g.m = make(map[K]*call[V])
110+
}
111+
if c, ok := g.m[key]; ok {
112+
c.dups++
113+
g.mu.Unlock()
114+
c.wg.Wait()
115+
116+
if e, ok := c.err.(*panicError); ok {
117+
panic(e)
118+
} else if c.err == errGoexit {
119+
runtime.Goexit()
120+
}
121+
return c.val, c.err, true
122+
}
123+
c := new(call[V])
124+
c.wg.Add(1)
125+
g.m[key] = c
126+
g.mu.Unlock()
127+
128+
g.doCall(c, key, fn)
129+
return c.val, c.err, c.dups > 0
130+
}
131+
132+
// DoChan is like Do but returns a channel that will receive the
133+
// results when they are ready.
134+
//
135+
// The returned channel will not be closed.
136+
func (g *Group2[K, V]) DoChan(key K, fn func() (V, error)) <-chan Result[V] {
137+
ch := make(chan Result[V], 1)
138+
g.mu.Lock()
139+
if g.m == nil {
140+
g.m = make(map[K]*call[V])
141+
}
142+
if c, ok := g.m[key]; ok {
143+
c.dups++
144+
c.chans = append(c.chans, ch)
145+
g.mu.Unlock()
146+
return ch
147+
}
148+
c := &call[V]{chans: []chan<- Result[V]{ch}}
149+
c.wg.Add(1)
150+
g.m[key] = c
151+
g.mu.Unlock()
152+
153+
go g.doCall(c, key, fn)
154+
155+
return ch
156+
}
157+
158+
// doCall handles the single call for a key.
159+
func (g *Group2[K, V]) doCall(c *call[V], key K, fn func() (V, error)) {
160+
normalReturn := false
161+
recovered := false
162+
163+
// use double-defer to distinguish panic from runtime.Goexit,
164+
// more details see https://golang.org/cl/134395
165+
defer func() {
166+
// the given function invoked runtime.Goexit
167+
if !normalReturn && !recovered {
168+
c.err = errGoexit
169+
}
170+
171+
g.mu.Lock()
172+
defer g.mu.Unlock()
173+
c.wg.Done()
174+
if g.m[key] == c {
175+
delete(g.m, key)
176+
}
177+
178+
if e, ok := c.err.(*panicError); ok {
179+
// In order to prevent the waiting channels from being blocked forever,
180+
// needs to ensure that this panic cannot be recovered.
181+
if len(c.chans) > 0 {
182+
go panic(e)
183+
select {} // Keep this goroutine around so that it will appear in the crash dump.
184+
} else {
185+
panic(e)
186+
}
187+
} else if c.err == errGoexit {
188+
// Already in the process of goexit, no need to call again
189+
} else {
190+
// Normal return
191+
for _, ch := range c.chans {
192+
ch <- Result[V]{c.val, c.err, c.dups > 0}
193+
}
194+
}
195+
}()
196+
197+
func() {
198+
defer func() {
199+
if !normalReturn {
200+
// Ideally, we would wait to take a stack trace until we've determined
201+
// whether this is a panic or a runtime.Goexit.
202+
//
203+
// Unfortunately, the only way we can distinguish the two is to see
204+
// whether the recover stopped the goroutine from terminating, and by
205+
// the time we know that, the part of the stack trace relevant to the
206+
// panic has been discarded.
207+
if r := recover(); r != nil {
208+
c.err = newPanicError(r)
209+
}
210+
}
211+
}()
212+
213+
c.val, c.err = fn()
214+
normalReturn = true
215+
}()
216+
217+
if !normalReturn {
218+
recovered = true
219+
}
220+
}
221+
222+
// Forget tells the singleflight to forget about a key. Future calls
223+
// to Do for this key will call the function rather than waiting for
224+
// an earlier call to complete.
225+
func (g *Group2[K, V]) Forget(key K) {
226+
g.mu.Lock()
227+
delete(g.m, key)
228+
g.mu.Unlock()
229+
}

0 commit comments

Comments
 (0)