go - Golang data race even with mutex for custom concurrent maps -
here simple concurrent map wrote learning purpose
package concurrent_hashmap import ( "hash/fnv" "sync" ) type concurrentmap struct { buckets []threadsafemap bucketcount uint32 } type threadsafemap struct { maplock sync.rwmutex hashmap map[string]interface{} } func newconcurrentmap(bucketsize uint32) *concurrentmap { var threadsafemapinstance threadsafemap var bucketofthreadsafemap []threadsafemap := 0; <= int(bucketsize); i++ { threadsafemapinstance = threadsafemap{sync.rwmutex{}, make(map[string]interface{})} bucketofthreadsafemap = append(bucketofthreadsafemap, threadsafemapinstance) } return &concurrentmap{bucketofthreadsafemap, bucketsize} } func (cmap *concurrentmap) put(key string, val interface{}) { bucketindex := hash(key) % cmap.bucketcount bucket := cmap.buckets[bucketindex] bucket.maplock.lock() bucket.hashmap[key] = val bucket.maplock.unlock() } // helper func hash(s string) uint32 { h := fnv.new32a() h.write([]byte(s)) return h.sum32() }
i trying write simple benchmark , find synchronize access work correctly concurrent access
fatal error: concurrent map writes
here benchmark run go test -bench=. -race
package concurrent_hashmap import ( "testing" "runtime" "math/rand" "strconv" "sync" ) // concurrent not work func benchmarkmyfunc(b *testing.b) { var wg sync.waitgroup runtime.gomaxprocs(runtime.numcpu()) my_map := newconcurrentmap(uint32(4)) n := 0; n < b.n; n++ { go insert(my_map, wg) } wg.wait() } func insert(my_map *concurrentmap, wg sync.waitgroup) { wg.add(1) var rand_int int element_num := 0; element_num < 1000; element_num++ { rand_int = rand.intn(100) my_map.put(strconv.itoa(rand_int), rand_int) } defer wg.done() } // works func benchmarkmyfuncsynchronize(b *testing.b) { my_map := newconcurrentmap(uint32(4)) n := 0; n < b.n; n++ { my_map.put(strconv.itoa(123), 123) } }
the warning: data race
saying bucket.hashmap[key] = val
causing problem, confused on why possible, since lock logic whenever write happening.
i think missing basic, can point out mistake?
thanks
edit1:
not sure if helps here mutex looks if don't lock anything
{{0 0} 0 0 0 0}
here looks if lock write
{{1 0} 0 0 -1073741824 0}
not sure why readercount low negative number
edit:2
i think find issue at, not sure why have code way
the issue
type threadsafemap struct { maplock sync.rwmutex // causing problem hashmap map[string]interface{} }
it should be
type threadsafemap struct { maplock *sync.rwmutex hashmap map[string]interface{} }
another weird thing in put
if put print statement inside lock
bucket.maplock.lock() fmt.println("start") fmt.println(bucket) fmt.println(bucketindex) fmt.println(bucket.maplock) fmt.println(&bucket.maplock) bucket.hashmap[key] = val defer bucket.maplock.unlock()
the following prints possible
start start {0x4212861c0 map[123:123]} {0x4212241c0 map[123:123]}
its weird because each start
printout should follow 4 lines of bucket info since cannot have start
back because indicate multiple thread access line inside lock
also reason each bucket.maplock
have different address if make bucketindex static, indicate not accessing same lock.
but despite above weirdness changing mutex pointer solves problem
i love find out why need pointers mutex , why prints seem indicate multiple thread accessing lock , why each lock has different address.
the problem statement
bucket := cmap.buckets[bucketindex]
bucket
contains copy of threadsafemap
@ index. sync.rwmutex
stored value, copy of made while assigning. map maps hold references underlying data structure, copy of pointer or same map passed. code locks copy of lock while writing single map, cause problem.
thats why don't face problem when change sync.rwmutex
*sync.rwmutex
. it's better store reference structure in map shown.
package concurrent_hashmap import ( "hash/fnv" "sync" ) type concurrentmap struct { buckets []*threadsafemap bucketcount uint32 } type threadsafemap struct { maplock sync.rwmutex hashmap map[string]interface{} } func newconcurrentmap(bucketsize uint32) *concurrentmap { var threadsafemapinstance *threadsafemap var bucketofthreadsafemap []*threadsafemap := 0; <= int(bucketsize); i++ { threadsafemapinstance = &threadsafemap{sync.rwmutex{}, make(map[string]interface{})} bucketofthreadsafemap = append(bucketofthreadsafemap, threadsafemapinstance) } return &concurrentmap{bucketofthreadsafemap, bucketsize} } func (cmap *concurrentmap) put(key string, val interface{}) { bucketindex := hash(key) % cmap.bucketcount bucket := cmap.buckets[bucketindex] bucket.maplock.lock() bucket.hashmap[key] = val bucket.maplock.unlock() } // helper func hash(s string) uint32 { h := fnv.new32a() h.write([]byte(s)) return h.sum32() }
it's possible validate scenario modifying function put
follows
func (cmap *concurrentmap) put(key string, val interface{}) { //fmt.println("index", key) bucketindex := 1 bucket := cmap.buckets[bucketindex] fmt.printf("%p %p\n", &(bucket.maplock), bucket.hashmap) }
Comments
Post a Comment