2525#include < chrono>
2626#include < condition_variable>
2727#include < mutex>
28+ #include < queue>
29+ #include < sstream>
30+ #include < thread>
2831
2932#include < async++.h>
3033
@@ -51,19 +54,21 @@ namespace geode
5154 Storage ( std::unique_ptr< geode::Identifier >&& data )
5255 : data_{ std::move ( data ) }, count_{ count++ }
5356 {
57+ Logger::debug ( count_, " -> " , " Storage" );
5458 }
5559
5660 ~Storage ()
5761 {
58- Logger::debug ( count, " -> " , " ~Storage" );
62+ Logger::debug ( count_, " -> " , " ~Storage" );
63+ terminate_storage ();
5964 std::unique_lock< std::mutex > locking{ lock_ };
60- terminate_ = true ;
61- condition_. notify_all ( );
62- condition_. wait ( locking, [ this ] {
63- Logger::debug ( count , " -> " , nb_calls_ );
64- return nb_calls_ == 0 ;
65+ condition_. wait ( locking, [&] {
66+ Logger::debug ( count_, " -> " , " ~calls " , queue_. size () );
67+ clean_queue ();
68+ Logger::debug ( count_ , " -> " , " ~calls2 " , queue_. size () );
69+ return queue_. empty () ;
6570 } );
66- Logger::debug ( count , " -> " , " ~Storage end" );
71+ Logger::debug ( count_ , " -> " , " ~Storage end" );
6772 }
6873
6974 bool expired () const
@@ -80,6 +85,9 @@ namespace geode
8085 {
8186 const std::lock_guard< std::mutex > locking{ lock_ };
8287 counter_++;
88+ std::ostringstream oss;
89+ oss << std::this_thread::get_id () << " " << this ;
90+ Logger::debug ( count_, " -> " , " new " , counter_, " " , oss.str () );
8391 }
8492
8593 void delete_data_reference ()
@@ -88,16 +96,26 @@ namespace geode
8896 OPENGEODE_ASSERT (
8997 counter_ > 0 , " [Database::Storage] Cannot decrement" );
9098 counter_--;
99+ std::ostringstream oss;
100+ oss << std::this_thread::get_id () << " " << this ;
101+ Logger::debug (
102+ count_, " -> " , " delete " , counter_, " " , oss.str () );
91103 if ( unused () )
92104 {
105+ clean_queue ();
93106 wait_for_memory_release ();
94107 }
95108 }
96109
97- void set_data ( std::unique_ptr< geode::Identifier >&& data )
98- {
99- data_ = std::move ( data );
100- }
110+ // void set_data( std::unique_ptr< geode::Identifier >&& data )
111+ // {
112+ // const std::lock_guard< std::mutex > locking{ lock_ };
113+ // terminate_ = false;
114+ // counter_ = 0;
115+ // count_ = count++;
116+ // Logger::debug( count_, " -> ", "set_data " );
117+ // data_ = std::move( data );
118+ // }
101119
102120 const std::unique_ptr< geode::Identifier >& data () const
103121 {
@@ -110,44 +128,75 @@ namespace geode
110128 }
111129
112130 private:
131+ void terminate_storage ()
132+ {
133+ std::ostringstream oss;
134+ oss << std::this_thread::get_id () << " " << this ;
135+ Logger::debug ( count_, " -> " , " begin terminate_storage" );
136+ terminate_ = true ;
137+ Logger::debug (
138+ count_, " -> " , " calls " , queue_.size (), " " , oss.str () );
139+ condition_.notify_all ();
140+ Logger::debug ( count_, " -> " , " end terminate_storage" );
141+ }
142+
143+ void clean_queue ()
144+ {
145+ while ( !queue_.empty () )
146+ {
147+ if ( !queue_.front ().ready () )
148+ {
149+ return ;
150+ }
151+ queue_.pop ();
152+ }
153+ }
154+
113155 void wait_for_memory_release ()
114156 {
115- last_used_ = std::chrono::system_clock::now ();
116- async::spawn ( [this ] {
117- Logger::debug ( count, " -> " , " wait" );
157+ queue_.emplace ( async::spawn ( [this ] {
158+ std::ostringstream oss;
159+ oss << std::this_thread::get_id () << " " << this ;
160+ Logger::debug ( count_, " -> " , " wait start " , oss.str () );
161+ Logger::debug ( count_, " -> " , " wait start 2 " , oss.str () );
118162 std::unique_lock< std::mutex > locking{ lock_ };
119- nb_calls_++ ;
120- Logger::debug ( count , " -> " , " wait 2" );
163+ last_used_ = std::chrono::system_clock::now () ;
164+ Logger::debug ( count_ , " -> " , " wait 2 + " , oss. str () );
121165 if ( !condition_.wait_for ( locking,
122- DATA_EXPIRATION + std::chrono::seconds ( 1 ), [this ] {
123- Logger::debug ( count, " -> " , terminate_ );
124- return terminate_;
166+ DATA_EXPIRATION + std::chrono::seconds ( 1 ),
167+ [this , &oss] {
168+ Logger::debug ( count_, " -> " , " terminate " ,
169+ terminate_.load (), " " , oss.str () );
170+ return terminate_.load ();
125171 } ) )
126172 {
127- Logger::debug ( count , " -> " , " wait in" );
173+ Logger::debug ( count_ , " -> " , " wait in" , " " , oss. str () );
128174 if ( !terminate_ && unused ()
129175 && std::chrono::system_clock::now () - last_used_
130176 > DATA_EXPIRATION )
131177 {
132- Logger::debug ( count, " -> " , " wait reset" );
178+ Logger::debug ( count_, " -> " , " wait reset" , " " ,
179+ oss.str (), " " ,
180+ reinterpret_cast < size_t >( this ) );
133181 data_.reset ();
134182 }
135183 }
136- Logger::debug ( count, " -> " , " wait out" );
137- nb_calls_--;
184+ Logger::debug ( count_, " -> " , " wait out + " , queue_.size (),
185+ " " , oss.str () );
186+ locking.unlock ();
138187 condition_.notify_all ();
139- } );
188+ } ) ) ;
140189 }
141190
142191 private:
143192 std::unique_ptr< Identifier > data_;
144- bool terminate_{ false };
145- index_t nb_calls_{ 0 };
193+ std::atomic< bool > terminate_{ false };
146194 index_t counter_{ 0 };
147195 std::chrono::time_point< std::chrono::system_clock > last_used_;
148196 std::mutex lock_;
149197 std::condition_variable condition_;
150198 int count_;
199+ std::queue< async::task< void > > queue_;
151200 };
152201
153202 class Database ::Impl
@@ -220,13 +269,13 @@ namespace geode
220269 const auto it = storage_.find ( id );
221270 if ( it != storage_.end () )
222271 {
223- if ( it->second ->unused () )
224- {
225- it->second ->set_data ( std::move ( data ) );
226- return it->second ->data ();
227- }
272+ // if( it->second->unused() )
273+ // {
274+ // it->second->set_data( std::move( data ) );
275+ // return it->second->data();
276+ // }
228277 delete_data ( id );
229- return do_register_data ( std::move ( data ) );
278+ // return do_register_data( std::move( data ) );
230279 }
231280 return do_register_data ( std::move ( data ) );
232281 }
0 commit comments