@@ -52,6 +52,7 @@ defmodule Mosaic.ShardRouter do
5252
5353 def find_similar_shards ( query_vector , limit , opts \\ [ ] ) , do: GenServer . call ( __MODULE__ , { :find_similar , query_vector , limit , opts } , 30_000 )
5454 def reset_state ( ) , do: GenServer . call ( __MODULE__ , :reset_state )
55+ def get_cache_state ( ) , do: GenServer . call ( __MODULE__ , :get_cache_state )
5556
5657 def handle_call ( { :find_similar , query_vector , limit , opts } , _from , state ) do
5758 min_similarity = Keyword . get ( opts , :min_similarity , Mosaic.Config . get ( :min_similarity ) )
@@ -73,15 +74,17 @@ defmodule Mosaic.ShardRouter do
7374 Exqlite.Sqlite3 . close ( state . routing_conn )
7475
7576 routing_db_path = Mosaic.Config . get ( :routing_db_path )
77+ max_size = Mosaic.Config . get ( :routing_cache_max_size ) # <-- Add this line
78+
7679 { :ok , conn } = Exqlite.Sqlite3 . open ( routing_db_path )
7780 initialize_routing_schema ( conn )
7881
7982 cache_table = :ets . new ( :shard_cache , [ :set , :private ] )
8083 access_table = :ets . new ( :shard_access , [ :ordered_set , :private ] )
8184 bloom_filters = load_bloom_filters ( conn )
82- counter = preload_hot_shards ( conn , state . max_size , cache_table , access_table , 0 )
85+ counter = preload_hot_shards ( conn , max_size , cache_table , access_table , 0 ) # <-- Use max_size
8386
84- new_state = % State { routing_conn: conn , cache_table: cache_table , access_table: access_table , access_counts: % { } , bloom_filters: bloom_filters , counter: counter , max_size: state . max_size , cache_hits: 0 , cache_misses: 0 }
87+ new_state = % State { routing_conn: conn , cache_table: cache_table , access_table: access_table , access_counts: % { } , bloom_filters: bloom_filters , counter: counter , max_size: max_size , cache_hits: 0 , cache_misses: 0 } # <-- Use max_size
8588 { :reply , :ok , new_state }
8689 end
8790
@@ -90,27 +93,39 @@ defmodule Mosaic.ShardRouter do
9093 { :reply , shards , state }
9194 end
9295
96+ def handle_call ( :get_cache_state , _from , state ) do
97+ cache_keys = :ets . tab2list ( state . cache_table ) |> Enum . map ( fn { id , _ } -> id end ) |> Enum . sort ( )
98+ access_list = :ets . tab2list ( state . access_table ) |> Enum . sort ( )
99+ { :reply , % { cache_keys: cache_keys , access_list: access_list } , state }
100+ end
101+
93102 defp find_similar_cached ( query_vector , limit , min_similarity , filter_ids , vector_math_impl , state ) do
94103 query_norm = vector_math_impl . norm ( query_vector )
95104
96105 candidates = if filter_ids do
97- filter_ids |> Enum . filter_map ( & :ets . member ( state . cache_table , & 1 ) , fn id -> case :ets . lookup ( state . cache_table , id ) do [ { ^ id , shard } ] -> shard ; [ ] -> nil end end ) |> Enum . reject ( & is_nil / 1 )
106+ filter_ids |> Enum . filter_map ( & :ets . member ( state . cache_table , & 1 ) , fn id ->
107+ case :ets . lookup ( state . cache_table , id ) do
108+ [ { ^ id , shard } ] -> shard
109+ [ ] -> nil
110+ end
111+ end ) |> Enum . reject ( & is_nil / 1 )
98112 else
99113 :ets . tab2list ( state . cache_table ) |> Enum . map ( fn { _id , shard } -> shard end )
100114 end
101115
102- if length ( candidates ) > 0 do
103- shards = candidates |> Enum . map ( fn shard ->
104- centroid_vector = :erlang . binary_to_term ( shard . centroid )
105- similarity = vector_math_impl . cosine_similarity ( query_vector , query_norm , centroid_vector , shard . centroid_norm )
106- Map . put ( shard , :similarity , similarity )
107- end ) |> Enum . filter ( & ( & 1 . similarity >= min_similarity ) ) |> Enum . sort_by ( & & 1 . similarity , :desc ) |> Enum . take ( limit )
116+ cached_shards = candidates |> Enum . map ( fn shard ->
117+ centroid_vector = :erlang . binary_to_term ( shard . centroid )
118+ similarity = vector_math_impl . cosine_similarity ( query_vector , query_norm , centroid_vector , shard . centroid_norm )
119+ Map . put ( shard , :similarity , similarity )
120+ end ) |> Enum . filter ( & ( & 1 . similarity >= min_similarity ) )
108121
109- { shards , % { state | cache_hits: state . cache_hits + length ( shards ) } }
122+ if length ( cached_shards ) >= limit do
123+ result = Enum . sort_by ( cached_shards , & & 1 . similarity , :desc ) |> Enum . take ( limit )
124+ { result , % { state | cache_hits: state . cache_hits + length ( result ) } }
110125 else
111- shards = find_similar_db ( query_vector , limit , min_similarity , filter_ids , vector_math_impl , state )
112- new_counter = Enum . reduce ( shards , state . counter , fn shard , cnt -> add_to_cache ( shard , cnt , state ) ; cnt + 1 end )
113- { shards , % { state | counter: new_counter , cache_misses: state . cache_misses + length ( shards ) } }
126+ db_shards = find_similar_db ( query_vector , limit , min_similarity , filter_ids , vector_math_impl , state )
127+ new_counter = Enum . reduce ( db_shards , state . counter , fn shard , cnt -> add_to_cache ( shard , cnt , state ) end )
128+ { db_shards , % { state | counter: new_counter , cache_misses: state . cache_misses + 1 } }
114129 end
115130 end
116131
@@ -135,17 +150,28 @@ defmodule Mosaic.ShardRouter do
135150 end
136151
137152 defp add_to_cache ( shard , counter , state ) do
138- cache_size = :ets . info ( state . cache_table , :size )
139- if cache_size >= state . max_size do
140- case :ets . first ( state . access_table ) do
141- :"$end_of_table" -> :ok
142- { old_counter , old_id } ->
143- :ets . delete ( state . access_table , { old_counter , old_id } )
144- :ets . delete ( state . cache_table , old_id )
145- end
153+ # Check if shard already exists
154+ case :ets . lookup ( state . cache_table , shard . id ) do
155+ [ { _ , _ } ] ->
156+ # Already cached, just update access
157+ :ets . match_delete ( state . access_table , { { :_ , shard . id } , :_ } )
158+ :ets . insert ( state . access_table , { { counter , shard . id } , true } )
159+ counter + 1
160+ [ ] ->
161+ # Not cached, need to add
162+ cache_size = :ets . info ( state . cache_table , :size )
163+ if cache_size >= state . max_size do
164+ case :ets . first ( state . access_table ) do
165+ :"$end_of_table" -> :ok
166+ { old_counter , old_id } ->
167+ :ets . delete ( state . access_table , { old_counter , old_id } )
168+ :ets . delete ( state . cache_table , old_id )
169+ end
170+ end
171+ :ets . insert ( state . cache_table , { shard . id , shard } )
172+ :ets . insert ( state . access_table , { { counter , shard . id } , true } )
173+ counter + 1
146174 end
147- :ets . insert ( state . cache_table , { shard . id , shard } )
148- :ets . insert ( state . access_table , { { counter , shard . id } , true } )
149175 end
150176
151177 defp filter_by_bloom ( keywords , bloom_filters ) do
@@ -182,6 +208,7 @@ defmodule Mosaic.ShardRouter do
182208
183209 defp update_access_stats ( shards , state ) do
184210 new_counts = Enum . reduce ( shards , state . access_counts , fn shard , counts -> Map . update ( counts , shard . id , 1 , & ( & 1 + 1 ) ) end )
211+
185212 new_counter = Enum . reduce ( shards , state . counter , fn shard , cnt ->
186213 shard_id = shard . id
187214 case :ets . lookup ( state . cache_table , shard_id ) do
@@ -192,6 +219,7 @@ defmodule Mosaic.ShardRouter do
192219 [ ] -> cnt
193220 end
194221 end )
222+
195223 % { state | access_counts: new_counts , counter: new_counter }
196224 end
197225
0 commit comments