Skip to content

Commit 0f785d7

Browse files
committed
Kotlin Examples
1 parent 201f053 commit 0f785d7

File tree

6 files changed

+610
-216
lines changed

6 files changed

+610
-216
lines changed

language-adaptors/rxjava-kotlin/build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ buildscript {
44
}
55

66
dependencies {
7-
classpath 'org.jetbrains.kotlin:kotlin-gradle-plugin:0.6.+'
7+
classpath 'org.jetbrains.kotlin:kotlin-gradle-plugin:0.6.602'
88
}
99
}
1010

@@ -13,7 +13,7 @@ apply plugin: 'osgi'
1313

1414
dependencies {
1515
compile project(':rxjava-core')
16-
compile 'org.jetbrains.kotlin:kotlin-stdlib:0.6.+'
16+
compile 'org.jetbrains.kotlin:kotlin-stdlib:0.6.602'
1717
provided 'junit:junit-dep:4.10'
1818
provided 'org.mockito:mockito-core:1.8.5'
1919
}
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package rx.lang.kotlin.examples
2+
3+
import rx.Observable
4+
import rx.Observer
5+
import rx.subscriptions.Subscriptions
6+
import rx.lang.kotlin.asObservable
7+
import kotlin.concurrent.thread
8+
import rx.Subscription
9+
import java.net.URL
10+
import java.util.Scanner
11+
12+
/**
13+
* Created by IntelliJ IDEA.
14+
* @author Mario Arias
15+
* Date: 28/09/13
16+
* Time: 3:00
17+
*/
18+
19+
fun main(args: Array<String>) {
20+
hello(array("Ben", "George"))
21+
customObservableNonBlocking().subscribe { println(it) }
22+
customObservableBlocking().subscribe { println(it) }
23+
val printArticle: (String?) -> Unit = {
24+
println("""--- Article ---
25+
${it!!.substring(0, 125)}
26+
""")
27+
}
28+
fetchWikipediaArticleAsynchronously("Tiger", "Elephant").subscribe(printArticle)
29+
simpleComposition()
30+
31+
fetchWikipediaArticleAsynchronouslyWithErrorHandling("Tiger", "NonExistentTitle", "Elephant").subscribe (printArticle) {
32+
println("""--- Error ---
33+
${it!!.getMessage()}
34+
""")
35+
}
36+
}
37+
38+
fun hello(names: Array<String>) {
39+
Observable.from(names)!!.subscribe { s -> println("Hello $s!") }
40+
}
41+
42+
fun customObservableBlocking(): Observable<String> {
43+
return {(observer: Observer<in String>) ->
44+
(0..50).forEach { i ->
45+
observer.onNext("value_$i")
46+
}
47+
observer.onCompleted()
48+
Subscriptions.empty()!!
49+
}.asObservable()
50+
}
51+
52+
fun customObservableNonBlocking(): Observable<String> {
53+
return {(observer: Observer<in String>) ->
54+
val t = thread {
55+
(0..50).forEach { i ->
56+
observer.onNext("anotherValue_$i")
57+
}
58+
observer.onCompleted()
59+
}
60+
Subscription {
61+
t.interrupt()
62+
}
63+
}.asObservable()
64+
}
65+
66+
fun fetchWikipediaArticleAsynchronously(vararg wikipediaArticleNames: String): Observable<String> {
67+
return {(observer: Observer<in String>) ->
68+
thread {
69+
wikipediaArticleNames.forEach { article ->
70+
observer.onNext(URL("http://en.wikipedia.org/wiki/$article").getText())
71+
}
72+
observer.onCompleted()
73+
}
74+
Subscriptions.empty()!!
75+
}.asObservable()
76+
}
77+
78+
fun simpleComposition() {
79+
customObservableNonBlocking()
80+
.skip(10)!!
81+
.take(5)!!
82+
.map { s -> "${s}_transformed" }!!
83+
.subscribe { println("onNext => $it") }
84+
}
85+
86+
fun fetchWikipediaArticleAsynchronouslyWithErrorHandling(vararg wikipediaArticleNames: String): Observable<String> {
87+
return {(observer: Observer<in String>) ->
88+
thread {
89+
try {
90+
wikipediaArticleNames.forEach { article ->
91+
observer.onNext(URL("http://en.wikipedia.org/wiki/$article").getText())
92+
}
93+
observer.onCompleted()
94+
} catch(e: Exception) {
95+
observer.onError(e)
96+
}
97+
}
98+
Subscriptions.empty()!!
99+
}.asObservable()
100+
}
101+
102+
103+
//Extensions
104+
fun URL.getText(): String {
105+
val scanner = Scanner(this.openStream()!!)
106+
val sb = StringBuilder(1024)
107+
while(scanner.hasNextLine()){
108+
sb.append(scanner.nextLine())
109+
sb.append('\n')
110+
}
111+
return sb.toString()
112+
}
113+
114+
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
package rx.lang.kotlin.examples.video
2+
3+
import rx.Observable
4+
5+
import rx.Observer
6+
import rx.Subscription
7+
import rx.lang.kotlin.asObservable
8+
import java.util.concurrent.LinkedBlockingQueue
9+
import java.util.concurrent.TimeUnit
10+
import java.util.concurrent.ThreadPoolExecutor
11+
import rx.subscriptions.BooleanSubscription
12+
import java.util.HashMap
13+
14+
fun main(args: Array<String>) {
15+
getVideoGridForDisplay(1).subscribe({ videoDictionary ->
16+
println(videoDictionary)
17+
}, { exception ->
18+
println("Error:$exception")
19+
}) {
20+
executor.shutdownNow()
21+
}
22+
}
23+
24+
val executor = ThreadPoolExecutor(4, 4, 1, TimeUnit.MINUTES, LinkedBlockingQueue<Runnable>())
25+
26+
fun getListOfLists(userId: Int): Observable<VideoList> {
27+
return {(observer: Observer<in VideoList>) ->
28+
val subscription = BooleanSubscription()
29+
try{
30+
executor.execute {
31+
Thread.sleep(180)
32+
(0..15).forEach {(i: Int):Unit ->
33+
if (subscription.isUnsubscribed()) {
34+
return@forEach
35+
}
36+
try{
37+
observer.onNext(VideoList(i))
38+
} catch (e: Exception){
39+
observer.onError(e)
40+
}
41+
}
42+
}
43+
} catch (e: Exception){
44+
observer.onError(e)
45+
}
46+
subscription
47+
}.asObservable()
48+
}
49+
50+
fun getVideoGridForDisplay(userId: Int): Observable<Map<String, Any?>> {
51+
return getListOfLists(userId).take(5)!!.mapMany { list ->
52+
list!!.videos.take(10)!!.mapMany { video ->
53+
val m = video!!.metadata.map { md ->
54+
mapOf("title" to md!!["title"], "lenght" to md["duration"])
55+
}
56+
val b = video.getBookmark(userId).map { position ->
57+
mapOf("bookmark" to position)
58+
}
59+
val r = video.getRating(userId).map { rating ->
60+
mapOf("rating" to mapOf(
61+
"actual" to rating!!.actualStarRating,
62+
"average" to rating.averageStarRating,
63+
"predicted" to rating.predictedStarRating
64+
))
65+
}
66+
67+
Observable.zip(m, b, r) { metadata, bookmark, rating ->
68+
val map: HashMap<String, Any?> = hashMapOf("id" to video.videoId)
69+
map.putAll(metadata!!)
70+
map.putAll(bookmark!!)
71+
map.putAll(rating!!)
72+
map as Map<String, Any?>
73+
}
74+
}
75+
}!!
76+
}
77+
78+
class Video(val videoId: Int){
79+
val metadata: Observable<Map<String, String>>
80+
get(){
81+
return {(observer: Observer<in Map<String, String>>) ->
82+
observer.onNext(mapOf("title" to "video-$videoId-title", "duration" to "5428"))
83+
Subscription { }
84+
}.asObservable()
85+
}
86+
87+
fun getBookmark(userId: Int): Observable<Int> {
88+
return {(observer: Observer<in Int>) ->
89+
executor.execute {
90+
Thread.sleep(4)
91+
if(randInt(6) > 1){
92+
observer.onNext(randInt(0))
93+
} else {
94+
observer.onNext(randInt(4000))
95+
}
96+
observer.onCompleted()
97+
}
98+
Subscription { }
99+
}.asObservable()
100+
}
101+
102+
fun getRating(userId: Int): Observable<VideoRating> {
103+
return {(observer: Observer<in VideoRating>) ->
104+
executor.execute {
105+
Thread.sleep(10)
106+
observer.onNext(VideoRating(videoId, userId))
107+
observer.onCompleted()
108+
}
109+
Subscription { }
110+
}.asObservable()
111+
}
112+
}
113+
114+
class VideoRating(val videoId: Int, val userId: Int){
115+
val predictedStarRating: Int
116+
get(){
117+
return randInt(5)
118+
}
119+
120+
val averageStarRating: Int
121+
get(){
122+
return randInt(4)
123+
}
124+
125+
val actualStarRating: Int
126+
get(){
127+
return randInt(5)
128+
}
129+
}
130+
131+
class VideoList(val position: Int){
132+
val listName: String
133+
get(){
134+
return "ListName-$position"
135+
}
136+
137+
val videos: Observable<Video>
138+
get(){
139+
return {(observer: Observer<in Video>) ->
140+
(0..50).forEach { i ->
141+
observer.onNext(Video((1000 * position) + i))
142+
}
143+
observer.onCompleted()
144+
Subscription { }
145+
}.asObservable()
146+
}
147+
148+
}
149+
150+
fun randInt(max: Int): Int {
151+
return Math.round(Math.random() * max).toInt()
152+
}

0 commit comments

Comments
 (0)