@@ -2,36 +2,62 @@ package cmap
22
33import (
44 "encoding/json"
5+ "fmt"
56 "sync"
67)
78
89var SHARD_COUNT = 32
910
11+ type Stringer interface {
12+ fmt.Stringer
13+ comparable
14+ }
15+
1016// A "thread" safe map of type string:Anything.
1117// To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards.
12- type ConcurrentMap [V any ] []* ConcurrentMapShared [V ]
18+ type ConcurrentMap [K comparable , V any ] struct {
19+ shards []* ConcurrentMapShared [K , V ]
20+ sharding func (key K ) uint32
21+ }
1322
1423// A "thread" safe string to anything map.
15- type ConcurrentMapShared [V any ] struct {
16- items map [string ]V
24+ type ConcurrentMapShared [K comparable , V any ] struct {
25+ items map [K ]V
1726 sync.RWMutex // Read Write mutex, guards access to internal map.
1827}
1928
20- // Creates a new concurrent map.
21- func New [V any ]() ConcurrentMap [V ] {
22- m := make (ConcurrentMap [V ], SHARD_COUNT )
29+ func create [K comparable , V any ](sharding func (key K ) uint32 ) ConcurrentMap [K , V ] {
30+ m := ConcurrentMap [K , V ]{
31+ sharding : sharding ,
32+ shards : make ([]* ConcurrentMapShared [K , V ], SHARD_COUNT ),
33+ }
2334 for i := 0 ; i < SHARD_COUNT ; i ++ {
24- m [i ] = & ConcurrentMapShared [V ]{items : make (map [string ]V )}
35+ m . shards [i ] = & ConcurrentMapShared [K , V ]{items : make (map [K ]V )}
2536 }
2637 return m
2738}
2839
40+ // Creates a new concurrent map.
41+ func New [V any ]() ConcurrentMap [string , V ] {
42+ return create [string , V ](fnv32 )
43+ }
44+
45+ // Creates a new concurrent map.
46+ func NewStringer [K Stringer , V any ]() ConcurrentMap [K , V ] {
47+ return create [K , V ](strfnv32 [K ])
48+ }
49+
50+ // Creates a new concurrent map.
51+ func NewWithCustomShardingFunction [K comparable , V any ](sharding func (key K ) uint32 ) ConcurrentMap [K , V ] {
52+ return create [K , V ](sharding )
53+ }
54+
2955// GetShard returns shard under given key
30- func (m ConcurrentMap [V ]) GetShard (key string ) * ConcurrentMapShared [V ] {
31- return m [uint (fnv32 (key ))% uint (SHARD_COUNT )]
56+ func (m ConcurrentMap [K , V ]) GetShard (key K ) * ConcurrentMapShared [K , V ] {
57+ return m . shards [uint (m . sharding (key ))% uint (SHARD_COUNT )]
3258}
3359
34- func (m ConcurrentMap [V ]) MSet (data map [string ]V ) {
60+ func (m ConcurrentMap [K , V ]) MSet (data map [K ]V ) {
3561 for key , value := range data {
3662 shard := m .GetShard (key )
3763 shard .Lock ()
@@ -41,7 +67,7 @@ func (m ConcurrentMap[V]) MSet(data map[string]V) {
4167}
4268
4369// Sets the given value under the specified key.
44- func (m ConcurrentMap [V ]) Set (key string , value V ) {
70+ func (m ConcurrentMap [K , V ]) Set (key K , value V ) {
4571 // Get map shard.
4672 shard := m .GetShard (key )
4773 shard .Lock ()
@@ -56,7 +82,7 @@ func (m ConcurrentMap[V]) Set(key string, value V) {
5682type UpsertCb [V any ] func (exist bool , valueInMap V , newValue V ) V
5783
5884// Insert or Update - updates existing element or inserts a new one using UpsertCb
59- func (m ConcurrentMap [V ]) Upsert (key string , value V , cb UpsertCb [V ]) (res V ) {
85+ func (m ConcurrentMap [K , V ]) Upsert (key K , value V , cb UpsertCb [V ]) (res V ) {
6086 shard := m .GetShard (key )
6187 shard .Lock ()
6288 v , ok := shard .items [key ]
@@ -67,7 +93,7 @@ func (m ConcurrentMap[V]) Upsert(key string, value V, cb UpsertCb[V]) (res V) {
6793}
6894
6995// Sets the given value under the specified key if no value was associated with it.
70- func (m ConcurrentMap [V ]) SetIfAbsent (key string , value V ) bool {
96+ func (m ConcurrentMap [K , V ]) SetIfAbsent (key K , value V ) bool {
7197 // Get map shard.
7298 shard := m .GetShard (key )
7399 shard .Lock ()
@@ -80,7 +106,7 @@ func (m ConcurrentMap[V]) SetIfAbsent(key string, value V) bool {
80106}
81107
82108// Get retrieves an element from map under given key.
83- func (m ConcurrentMap [V ]) Get (key string ) (V , bool ) {
109+ func (m ConcurrentMap [K , V ]) Get (key K ) (V , bool ) {
84110 // Get shard
85111 shard := m .GetShard (key )
86112 shard .RLock ()
@@ -91,10 +117,10 @@ func (m ConcurrentMap[V]) Get(key string) (V, bool) {
91117}
92118
93119// Count returns the number of elements within the map.
94- func (m ConcurrentMap [V ]) Count () int {
120+ func (m ConcurrentMap [K , V ]) Count () int {
95121 count := 0
96122 for i := 0 ; i < SHARD_COUNT ; i ++ {
97- shard := m [i ]
123+ shard := m . shards [i ]
98124 shard .RLock ()
99125 count += len (shard .items )
100126 shard .RUnlock ()
@@ -103,7 +129,7 @@ func (m ConcurrentMap[V]) Count() int {
103129}
104130
105131// Looks up an item under specified key
106- func (m ConcurrentMap [V ]) Has (key string ) bool {
132+ func (m ConcurrentMap [K , V ]) Has (key K ) bool {
107133 // Get shard
108134 shard := m .GetShard (key )
109135 shard .RLock ()
@@ -114,7 +140,7 @@ func (m ConcurrentMap[V]) Has(key string) bool {
114140}
115141
116142// Remove removes an element from the map.
117- func (m ConcurrentMap [V ]) Remove (key string ) {
143+ func (m ConcurrentMap [K , V ]) Remove (key K ) {
118144 // Try to get shard.
119145 shard := m .GetShard (key )
120146 shard .Lock ()
@@ -124,12 +150,12 @@ func (m ConcurrentMap[V]) Remove(key string) {
124150
125151// RemoveCb is a callback executed in a map.RemoveCb() call, while Lock is held
126152// If returns true, the element will be removed from the map
127- type RemoveCb [V any ] func (key string , v V , exists bool ) bool
153+ type RemoveCb [K any , V any ] func (key K , v V , exists bool ) bool
128154
129155// RemoveCb locks the shard containing the key, retrieves its current value and calls the callback with those params
130156// If callback returns true and element exists, it will remove it from the map
131157// Returns the value returned by the callback (even if element was not present in the map)
132- func (m ConcurrentMap [V ]) RemoveCb (key string , cb RemoveCb [V ]) bool {
158+ func (m ConcurrentMap [K , V ]) RemoveCb (key K , cb RemoveCb [K , V ]) bool {
133159 // Try to get shard.
134160 shard := m .GetShard (key )
135161 shard .Lock ()
@@ -143,7 +169,7 @@ func (m ConcurrentMap[V]) RemoveCb(key string, cb RemoveCb[V]) bool {
143169}
144170
145171// Pop removes an element from the map and returns it
146- func (m ConcurrentMap [V ]) Pop (key string ) (v V , exists bool ) {
172+ func (m ConcurrentMap [K , V ]) Pop (key K ) (v V , exists bool ) {
147173 // Try to get shard.
148174 shard := m .GetShard (key )
149175 shard .Lock ()
@@ -154,40 +180,40 @@ func (m ConcurrentMap[V]) Pop(key string) (v V, exists bool) {
154180}
155181
156182// IsEmpty checks if map is empty.
157- func (m ConcurrentMap [V ]) IsEmpty () bool {
183+ func (m ConcurrentMap [K , V ]) IsEmpty () bool {
158184 return m .Count () == 0
159185}
160186
161187// Used by the Iter & IterBuffered functions to wrap two variables together over a channel,
162- type Tuple [V any ] struct {
163- Key string
188+ type Tuple [K comparable , V any ] struct {
189+ Key K
164190 Val V
165191}
166192
167193// Iter returns an iterator which could be used in a for range loop.
168194//
169195// Deprecated: using IterBuffered() will get a better performence
170- func (m ConcurrentMap [V ]) Iter () <- chan Tuple [V ] {
196+ func (m ConcurrentMap [K , V ]) Iter () <- chan Tuple [K , V ] {
171197 chans := snapshot (m )
172- ch := make (chan Tuple [V ])
198+ ch := make (chan Tuple [K , V ])
173199 go fanIn (chans , ch )
174200 return ch
175201}
176202
177203// IterBuffered returns a buffered iterator which could be used in a for range loop.
178- func (m ConcurrentMap [V ]) IterBuffered () <- chan Tuple [V ] {
204+ func (m ConcurrentMap [K , V ]) IterBuffered () <- chan Tuple [K , V ] {
179205 chans := snapshot (m )
180206 total := 0
181207 for _ , c := range chans {
182208 total += cap (c )
183209 }
184- ch := make (chan Tuple [V ], total )
210+ ch := make (chan Tuple [K , V ], total )
185211 go fanIn (chans , ch )
186212 return ch
187213}
188214
189215// Clear removes all items from map.
190- func (m ConcurrentMap [V ]) Clear () {
216+ func (m ConcurrentMap [K , V ]) Clear () {
191217 for item := range m .IterBuffered () {
192218 m .Remove (item .Key )
193219 }
@@ -197,23 +223,23 @@ func (m ConcurrentMap[V]) Clear() {
197223// which likely takes a snapshot of `m`.
198224// It returns once the size of each buffered channel is determined,
199225// before all the channels are populated using goroutines.
200- func snapshot [V any ](m ConcurrentMap [V ]) (chans []chan Tuple [V ]) {
226+ func snapshot [K comparable , V any ](m ConcurrentMap [K , V ]) (chans []chan Tuple [K , V ]) {
201227 //When you access map items before initializing.
202- if len (m ) == 0 {
228+ if len (m . shards ) == 0 {
203229 panic (`cmap.ConcurrentMap is not initialized. Should run New() before usage.` )
204230 }
205- chans = make ([]chan Tuple [V ], SHARD_COUNT )
231+ chans = make ([]chan Tuple [K , V ], SHARD_COUNT )
206232 wg := sync.WaitGroup {}
207233 wg .Add (SHARD_COUNT )
208234 // Foreach shard.
209- for index , shard := range m {
210- go func (index int , shard * ConcurrentMapShared [V ]) {
235+ for index , shard := range m . shards {
236+ go func (index int , shard * ConcurrentMapShared [K , V ]) {
211237 // Foreach key, value pair.
212238 shard .RLock ()
213- chans [index ] = make (chan Tuple [V ], len (shard .items ))
239+ chans [index ] = make (chan Tuple [K , V ], len (shard .items ))
214240 wg .Done ()
215241 for key , val := range shard .items {
216- chans [index ] <- Tuple [V ]{key , val }
242+ chans [index ] <- Tuple [K , V ]{key , val }
217243 }
218244 shard .RUnlock ()
219245 close (chans [index ])
@@ -224,11 +250,11 @@ func snapshot[V any](m ConcurrentMap[V]) (chans []chan Tuple[V]) {
224250}
225251
226252// fanIn reads elements from channels `chans` into channel `out`
227- func fanIn [V any ](chans []chan Tuple [V ], out chan Tuple [V ]) {
253+ func fanIn [K comparable , V any ](chans []chan Tuple [K , V ], out chan Tuple [K , V ]) {
228254 wg := sync.WaitGroup {}
229255 wg .Add (len (chans ))
230256 for _ , ch := range chans {
231- go func (ch chan Tuple [V ]) {
257+ go func (ch chan Tuple [K , V ]) {
232258 for t := range ch {
233259 out <- t
234260 }
@@ -240,8 +266,8 @@ func fanIn[V any](chans []chan Tuple[V], out chan Tuple[V]) {
240266}
241267
242268// Items returns all items as map[string]V
243- func (m ConcurrentMap [V ]) Items () map [string ]V {
244- tmp := make (map [string ]V )
269+ func (m ConcurrentMap [K , V ]) Items () map [K ]V {
270+ tmp := make (map [K ]V )
245271
246272 // Insert items to temporary map.
247273 for item := range m .IterBuffered () {
@@ -255,13 +281,13 @@ func (m ConcurrentMap[V]) Items() map[string]V {
255281// maps. RLock is held for all calls for a given shard
256282// therefore callback sess consistent view of a shard,
257283// but not across the shards
258- type IterCb [V any ] func (key string , v V )
284+ type IterCb [K comparable , V any ] func (key K , v V )
259285
260286// Callback based iterator, cheapest way to read
261287// all elements in a map.
262- func (m ConcurrentMap [V ]) IterCb (fn IterCb [V ]) {
263- for idx := range m {
264- shard := (m )[idx ]
288+ func (m ConcurrentMap [K , V ]) IterCb (fn IterCb [K , V ]) {
289+ for idx := range m . shards {
290+ shard := (m . shards )[idx ]
265291 shard .RLock ()
266292 for key , value := range shard .items {
267293 fn (key , value )
@@ -271,15 +297,15 @@ func (m ConcurrentMap[V]) IterCb(fn IterCb[V]) {
271297}
272298
273299// Keys returns all keys as []string
274- func (m ConcurrentMap [V ]) Keys () []string {
300+ func (m ConcurrentMap [K , V ]) Keys () []K {
275301 count := m .Count ()
276- ch := make (chan string , count )
302+ ch := make (chan K , count )
277303 go func () {
278304 // Foreach shard.
279305 wg := sync.WaitGroup {}
280306 wg .Add (SHARD_COUNT )
281- for _ , shard := range m {
282- go func (shard * ConcurrentMapShared [V ]) {
307+ for _ , shard := range m . shards {
308+ go func (shard * ConcurrentMapShared [K , V ]) {
283309 // Foreach key, value pair.
284310 shard .RLock ()
285311 for key := range shard .items {
@@ -294,24 +320,27 @@ func (m ConcurrentMap[V]) Keys() []string {
294320 }()
295321
296322 // Generate keys
297- keys := make ([]string , 0 , count )
323+ keys := make ([]K , 0 , count )
298324 for k := range ch {
299325 keys = append (keys , k )
300326 }
301327 return keys
302328}
303329
304- //Reviles ConcurrentMap "private" variables to json marshal.
305- func (m ConcurrentMap [V ]) MarshalJSON () ([]byte , error ) {
330+ // Reviles ConcurrentMap "private" variables to json marshal.
331+ func (m ConcurrentMap [K , V ]) MarshalJSON () ([]byte , error ) {
306332 // Create a temporary map, which will hold all item spread across shards.
307- tmp := make (map [string ]V )
333+ tmp := make (map [K ]V )
308334
309335 // Insert items to temporary map.
310336 for item := range m .IterBuffered () {
311337 tmp [item .Key ] = item .Val
312338 }
313339 return json .Marshal (tmp )
314340}
341+ func strfnv32 [K fmt.Stringer ](key K ) uint32 {
342+ return fnv32 (key .String ())
343+ }
315344
316345func fnv32 (key string ) uint32 {
317346 hash := uint32 (2166136261 )
@@ -325,17 +354,17 @@ func fnv32(key string) uint32 {
325354}
326355
327356// Reverse process of Marshal.
328- func (m * ConcurrentMap [V ]) UnmarshalJSON (b []byte ) (err error ) {
329- tmp := make (map [string ]V )
330-
331- // Unmarshal into a single map.
332- if err := json .Unmarshal (b , & tmp ); err != nil {
333- return err
334- }
335-
336- // foreach key,value pair in temporary map insert into our concurrent map.
337- for key , val := range tmp {
338- m .Set (key , val )
339- }
357+ func (m * ConcurrentMap [K , V ]) UnmarshalJSON (b []byte ) (err error ) {
358+ tmp := make (map [K ]V )
359+
360+ // Unmarshal into a single map.
361+ if err := json .Unmarshal (b , & tmp ); err != nil {
362+ return err
363+ }
364+
365+ // foreach key,value pair in temporary map insert into our concurrent map.
366+ for key , val := range tmp {
367+ m .Set (key , val )
368+ }
340369 return nil
341370}
0 commit comments