Skip to content

Commit 0c77149

Browse files
vjnbenjchristensen
authored andcommitted
Further updates to the v/ version of Flowable (#345)
* Address lint warnings Lint warnings: single-argument constructors not marked explicit * Added tests for the v/ version. * Added a test for ->take() * Fix open source build Reviewed By: yschimke Differential Revision: D4921941 fbshipit-source-id: 1d7e2d13d78f2318c82ce67c00b73d0e641c0395 * Address lint warnings Lint warnings: single-argument constructors not marked explicit * Added tests for the v/ version. * Added a test for ->take() * Switched CMake version, to get past travis build * Use std::size_t instead of size_t * Fix build error (detected in travis) * Remove dependency on reactivestreams * Took out build-problematic typedef * Moved Wrapper out of Flowable, to fix the build * Add workaround for (old) gcc bug With lambdas in subclass methods invoking methods defined in a superclass, gcc (pre-6) complains of methods not having proper visibility. * Thread libraries for yarpl tests * Merged tests, disabled failing one * Added yarpl/tests to travis * Enable java-client to cpp-server tck tests Summary: cpp-client to java-server tck tests dont work yet Closes #344 Differential Revision: D4928525 Pulled By: lehecka fbshipit-source-id: 0c747080ad13d60a729acaefd584534aa6ba6278 * Address lint warnings Lint warnings: single-argument constructors not marked explicit * Added tests for the v/ version. * Added a test for ->take() * Switched CMake version, to get past travis build * Use std::size_t instead of size_t * Fix build error (detected in travis) * Remove dependency on reactivestreams * Took out build-problematic typedef * Moved Wrapper out of Flowable, to fix the build * Add workaround for (old) gcc bug With lambdas in subclass methods invoking methods defined in a superclass, gcc (pre-6) complains of methods not having proper visibility. * Thread libraries for yarpl tests * Merged tests, disabled failing one * Added yarpl/tests to travis
1 parent ab0ebda commit 0c77149

File tree

17 files changed

+499
-125
lines changed

17 files changed

+499
-125
lines changed

.travis.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ script:
6868
- cmake .. -DCMAKE_BUILD_TYPE=$BUILD_TYPE -DCMAKE_CXX_FLAGS="${CXX_FLAGS}" -DCMAKE_EXE_LINKER_FLAGS="${CXX_LINKER_FLAGS}" -DMETA_CXX_STD=$CPP_VERSION
6969
- make -j8
7070
- ./tests
71+
- ./experimental/yarpl/yarpl-tests
7172
- cd ..
7273
- ./scripts/prepare_tck_drivers.sh
7374
- ./scripts/tck_test.sh -c cpp -s cpp

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ target_link_libraries(
373373

374374
add_dependencies(tcpresumeserver gmock)
375375

376-
# add_subdirectory(experimental/yarpl)
376+
add_subdirectory(experimental/yarpl)
377377

378378
########################################
379379
# Examples

experimental/yarpl/CMakeLists.txt

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
cmake_minimum_required (VERSION 3.4)
1+
cmake_minimum_required (VERSION 3.2)
22

33
# To debug the project, set the build type.
44
set(CMAKE_BUILD_TYPE Debug)
@@ -15,14 +15,16 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS 1)
1515
# Common configuration for all build modes.
1616
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14")
1717
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -Wno-unused-parameter")
18-
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-weak-vtables -Wpadded")
18+
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-weak-vtables -Wno-padded")
1919
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fno-omit-frame-pointer")
2020
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -momit-leaf-frame-pointer")
2121

2222
# Configuration for Debug build mode.
2323
#set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fsanitize=address")
2424
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG}")
2525

26+
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -DNDEBUG")
27+
2628
include_directories(${CMAKE_SOURCE_DIR})
2729

2830

@@ -83,6 +85,7 @@ add_library(
8385
include/yarpl/v/Subscriber.h
8486
include/yarpl/v/Subscribers.h
8587
include/yarpl/v/Subscription.h
88+
src/yarpl/v/Refcounted.cpp
8689
)
8790

8891
target_include_directories(
@@ -134,12 +137,15 @@ add_executable(
134137
test/flowable_operators/Flowable_Take_test.cpp
135138
test/flowable_operators/Flowable_SubscribeOn_test.cpp
136139
test/Scheduler_test.cpp
140+
test/v/RefcountedTest.cpp
141+
test/v/FlowableTest.cpp
137142
)
138143

139144
target_link_libraries(
140145
yarpl-tests
141146
yarpl
142147
${GMOCK_LIBS} # inherited from reactivesocket-cpp CMake
148+
${CMAKE_THREAD_LIBS_INIT}
143149
)
144150

145151
target_include_directories(

experimental/yarpl/examples/FlowableVExamples.cpp

Lines changed: 47 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include "FlowableVExamples.h"
44

55
#include <iostream>
6+
#include <sstream>
67
#include <string>
78
#include <thread>
89

@@ -22,6 +23,12 @@ auto printer() {
2223
2 /* low [optional] batch size for demo */);
2324
}
2425

26+
std::string getThreadId() {
27+
std::ostringstream oss;
28+
oss << std::this_thread::get_id();
29+
return oss.str();
30+
}
31+
2532
} // namespace
2633

2734
void FlowableVExamples::run() {
@@ -56,22 +63,44 @@ void FlowableVExamples::run() {
5663

5764
std::cout << "take example: 3 out of 10 items" << std::endl;
5865
Flowables::range(1, 11)->take(3)->subscribe(printer<int64_t>());
59-
}
6066

61-
// ThreadScheduler scheduler;
62-
63-
// FlowablesC::range(1, 10)
64-
// ->subscribeOn(scheduler)
65-
// ->map([](auto i) {
66-
// std::this_thread::sleep_for(std::chrono::milliseconds(400));
67-
// return "mapped->" + std::to_string(i);
68-
// })
69-
// ->take(2)
70-
// ->subscribe(Subscribers::create<std::string>([](auto t) {
71-
// std::cout << "Value received after scheduling: " << t << std::endl;
72-
// }));
73-
74-
// // wait to see above async example
75-
// /* sleep override */
76-
// std::this_thread::sleep_for(std::chrono::milliseconds(1300));
77-
//}
67+
auto flowable = Flowable<int>::create(
68+
[total=0](Subscriber<int>& subscriber, int64_t requested) mutable {
69+
subscriber.onNext(12345678);
70+
subscriber.onError(std::make_exception_ptr(
71+
std::runtime_error("error")));
72+
return std::make_tuple(int64_t{1}, false);
73+
}
74+
);
75+
76+
auto subscriber = Subscribers::create<int>(
77+
[](int next) {
78+
std::cout << "@next: " << next << std::endl;
79+
},
80+
[](std::exception_ptr eptr) {
81+
try {
82+
std::rethrow_exception(eptr);
83+
} catch (const std::exception& exception) {
84+
std::cerr << " exception: " << exception.what() << std::endl;
85+
} catch (...) {
86+
std::cerr << " !unknown exception!" << std::endl;
87+
}
88+
},
89+
[] {
90+
std::cout << "Completed." << std::endl;
91+
}
92+
);
93+
94+
flowable->subscribe(subscriber);
95+
96+
ThreadScheduler scheduler;
97+
98+
std::cout << "subscribe_on example" << std::endl;
99+
Flowables::just({ "0: ", "1: ", "2: " })
100+
->map([](const char* p) { return std::string(p); })
101+
->map([](std::string log) { return log + " on " + getThreadId(); })
102+
->subscribeOn(scheduler)
103+
->subscribe(printer<std::string>());
104+
std::cout << " waiting on " << getThreadId() << std::endl;
105+
std::this_thread::sleep_for(std::chrono::milliseconds(10));
106+
}

experimental/yarpl/examples/yarpl-playground.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
int main() {
1212
std::cout << "*** Run yarpl::flowable::v examples ***" << std::endl;
13-
// FlowableVExamples::run();
13+
FlowableVExamples::run();
1414
// std::cout << "*** Run ObservableExamples ***" << std::endl;
1515
// ObservableExamples::run();
1616
// std::cout << "*** Run FlowableExamples ***" << std::endl;

experimental/yarpl/include/yarpl/v/Flowable.h

Lines changed: 43 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
#include <type_traits>
88
#include <utility>
99

10-
#include "reactivestreams/ReactiveStreams.h"
10+
#include "yarpl/Scheduler.h"
1111
#include "yarpl/utils/type_traits.h"
1212

1313
#include "Refcounted.h"
@@ -21,17 +21,17 @@ class Flowable : public virtual Refcounted {
2121
static const auto CANCELED = std::numeric_limits<int64_t>::min();
2222
static const auto NO_FLOW_CONTROL = std::numeric_limits<int64_t>::max();
2323

24-
using Subscriber = Subscriber<T>;
25-
26-
virtual void subscribe(Reference<Subscriber>) = 0;
24+
virtual void subscribe(Reference<Subscriber<T>>) = 0;
2725

2826
template <typename Function>
2927
auto map(Function&& function);
3028

3129
auto take(int64_t);
3230

31+
auto subscribeOn(Scheduler&);
32+
3333
/**
34-
* Create a flowable from an emitter.
34+
* \brief Create a flowable from an emitter.
3535
*
3636
* \param emitter function that is invoked to emit values to a subscriber.
3737
* The emitter's signature is:
@@ -46,48 +46,32 @@ class Flowable : public virtual Refcounted {
4646
*
4747
* \return a handle to a flowable that will use the emitter.
4848
*/
49+
template<typename Emitter>
50+
class EmitterWrapper;
51+
4952
template <
5053
typename Emitter,
5154
typename = typename std::enable_if<std::is_callable<
52-
Emitter(Subscriber&, int64_t),
55+
Emitter(Subscriber<T>&, int64_t),
5356
std::tuple<int64_t, bool>>::value>::type>
5457
static auto create(Emitter&& emitter);
5558

5659
private:
57-
virtual std::tuple<int64_t, bool> emit(Subscriber&, int64_t) {
60+
virtual std::tuple<int64_t, bool> emit(Subscriber<T>&, int64_t) {
5861
return std::make_tuple(static_cast<int64_t>(0), false);
5962
}
6063

61-
template <typename Emitter>
62-
class Wrapper : public Flowable {
63-
public:
64-
Wrapper(Emitter&& emitter) : emitter_(std::forward<Emitter>(emitter)) {}
65-
66-
virtual void subscribe(Reference<Subscriber> subscriber) {
67-
new SynchronousSubscription(this, std::move(subscriber));
68-
}
69-
70-
virtual std::tuple<int64_t, bool> emit(
71-
Subscriber& subscriber,
72-
int64_t requested) {
73-
return emitter_(subscriber, requested);
74-
}
75-
76-
private:
77-
Emitter emitter_;
78-
};
79-
8064
/**
8165
* Manager for a flowable subscription.
8266
*
8367
* This is synchronous: the emit calls are triggered within the context
8468
* of a request(n) call.
8569
*/
86-
class SynchronousSubscription : public Subscription, public Subscriber {
70+
class SynchronousSubscription : public Subscription, public Subscriber<T> {
8771
public:
8872
SynchronousSubscription(
8973
Reference<Flowable> flowable,
90-
Reference<Subscriber> subscriber)
74+
Reference<Subscriber<T>> subscriber)
9175
: flowable_(std::move(flowable)), subscriber_(std::move(subscriber)) {
9276
subscriber_->onSubscribe(Reference<Subscription>(this));
9377
}
@@ -208,7 +192,7 @@ class Flowable : public virtual Refcounted {
208192
std::mutex processing_;
209193

210194
Reference<Flowable> flowable_;
211-
Reference<Subscriber> subscriber_;
195+
Reference<Subscriber<T>> subscriber_;
212196
};
213197
};
214198

@@ -218,11 +202,34 @@ class Flowable : public virtual Refcounted {
218202

219203
namespace yarpl {
220204

205+
template <typename T>
206+
template <typename Emitter>
207+
class Flowable<T>::EmitterWrapper : public Flowable<T> {
208+
public:
209+
explicit EmitterWrapper(Emitter&& emitter)
210+
: emitter_(std::forward<Emitter>(emitter)) {}
211+
212+
virtual void subscribe(Reference<Subscriber<T>> subscriber) {
213+
new SynchronousSubscription(
214+
Reference<Flowable>(this), std::move(subscriber));
215+
}
216+
217+
virtual std::tuple<int64_t, bool> emit(
218+
Subscriber<T>& subscriber,
219+
int64_t requested) {
220+
return emitter_(subscriber, requested);
221+
}
222+
223+
private:
224+
Emitter emitter_;
225+
};
226+
221227
template <typename T>
222228
template <typename Emitter, typename>
223229
auto Flowable<T>::create(Emitter&& emitter) {
224230
return Reference<Flowable<T>>(
225-
new Flowable<T>::Wrapper<Emitter>(std::forward<Emitter>(emitter)));
231+
new Flowable<T>::EmitterWrapper<Emitter>(
232+
std::forward<Emitter>(emitter)));
226233
}
227234

228235
template <typename T>
@@ -239,4 +246,10 @@ auto Flowable<T>::take(int64_t limit) {
239246
new TakeOperator<T>(Reference<Flowable<T>>(this), limit));
240247
}
241248

249+
template<typename T>
250+
auto Flowable<T>::subscribeOn(Scheduler& scheduler) {
251+
return Reference<Flowable<T>>(
252+
new SubscribeOnOperator<T>(Reference<Flowable<T>>(this), scheduler));
253+
}
254+
242255
} // yarpl

0 commit comments

Comments
 (0)