Skip to content
This repository was archived by the owner on Aug 18, 2020. It is now read-only.

Commit 565a702

Browse files
committed
Merge branch 'feature/129-streamelements-connector' into develop
2 parents 12cbcb6 + aa16f1c commit 565a702

File tree

4 files changed

+305
-0
lines changed

4 files changed

+305
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
<connector>
2+
<display>StreamElements</display>
3+
<description>By using the StreamElements service you can react various events of your stream like donations, subscriptions or follows. Just listen to the events provided by the StreamElements event input.</description>
4+
<wiki>https://github.com/codeoverflow-org/chatoverflow/wiki/StreamElements</wiki>
5+
<icon48>
6+

7+
</icon48>
8+
</connector>
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package org.codeoverflow.chatoverflow.requirement.service.streamelements
2+
3+
import io.socket.client.Socket._
4+
import io.socket.client.{IO, Socket}
5+
import org.codeoverflow.chatoverflow.WithLogger
6+
import org.codeoverflow.chatoverflow.connector.EventConnector
7+
import org.json.JSONObject
8+
9+
class StreamElementsConnector(sourceIdentifier: String) extends EventConnector(sourceIdentifier) with WithLogger {
10+
override protected var requiredCredentialKeys: List[String] = List("jwt-token")
11+
override protected var optionalCredentialKeys: List[String] = List()
12+
13+
private val TIMEOUT = 10000
14+
private val SOCKET_URL = "https://realtime.streamelements.com"
15+
private var socket: Option[Socket] = None
16+
private var connected: Option[Boolean] = None
17+
private val listener = new StreamElementsListener
18+
listener.registerEventHandler((e, ct) => call(e)(ct)) // pass events to in/output.
19+
20+
override def start(): Boolean = {
21+
logger info "Connecting to the StreamElements websocket..."
22+
23+
val opts = new IO.Options()
24+
opts.transports = Array("websocket")
25+
26+
socket = Some(IO.socket(SOCKET_URL, opts).connect())
27+
registerSocketEvents(socket.get)
28+
29+
connected.synchronized {
30+
connected.wait(TIMEOUT)
31+
}
32+
33+
connected.getOrElse({
34+
logger warn "Could not connect to StreamElements socket: Timed out!"
35+
false
36+
})
37+
}
38+
39+
private def registerSocketEvents(s: Socket): Unit = {
40+
def setConnected(isConnected: Boolean): Unit = connected.synchronized {
41+
connected.notify()
42+
connected = Some(isConnected)
43+
}
44+
45+
s.on(EVENT_CONNECT, (_: Any) => {
46+
logger info "Successfully connected to the StreamElements websocket."
47+
48+
val authObj = new JSONObject()
49+
.put("method", "jwt")
50+
.put("token", credentials.get.getValue("jwt-token").get)
51+
52+
logger info "Authenticating with the StreamElements websocket..."
53+
s.emit("authenticate", authObj)
54+
})
55+
56+
s.on(EVENT_CONNECT_ERROR, (e: Array[AnyRef]) => {
57+
logger warn s"Could not connect to StreamElements socket:"
58+
logger warn e.mkString(", ")
59+
60+
setConnected(false)
61+
})
62+
63+
s.on(EVENT_CONNECT_TIMEOUT, (_: Any) => {
64+
setConnected(false)
65+
})
66+
67+
s.on(EVENT_ERROR, (e: Array[AnyRef]) => {
68+
logger warn s"StreamElements($sourceIdentifier) socket error:"
69+
logger warn e.mkString(", ")
70+
})
71+
72+
s.on(EVENT_DISCONNECT, (_: Any) => {
73+
logger info "Disconnected from the StreamElements websocket."
74+
})
75+
76+
s.on("authenticated", (_: Any) => {
77+
logger info "Successfully authenticated to the StreamElements websocket."
78+
79+
setConnected(true)
80+
})
81+
82+
s.on("event", (event: Array[AnyRef]) => listener.handleEvent(event))
83+
}
84+
85+
override def stop(): Boolean = {
86+
if (socket.isDefined) {
87+
socket.get.close()
88+
}
89+
connected = None
90+
socket = None
91+
true
92+
}
93+
}
94+
95+
object StreamElementsConnector {
96+
private[streamelements] sealed class StreamElementsEventJSON(json: JSONObject)
97+
private[streamelements] case class SubscriptionEventJSON(json: JSONObject) extends StreamElementsEventJSON(json)
98+
private[streamelements] case class DonationEventJSON(json: JSONObject) extends StreamElementsEventJSON(json)
99+
private[streamelements] case class FollowEventJSON(json: JSONObject) extends StreamElementsEventJSON(json)
100+
private[streamelements] case class CheerEventJSON(json: JSONObject) extends StreamElementsEventJSON(json)
101+
private[streamelements] case class HostEventJSON(json: JSONObject) extends StreamElementsEventJSON(json)
102+
private[streamelements] case class RaidEventJSON(json: JSONObject) extends StreamElementsEventJSON(json)
103+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package org.codeoverflow.chatoverflow.requirement.service.streamelements
2+
3+
import org.codeoverflow.chatoverflow.requirement.impl.EventManager
4+
import org.codeoverflow.chatoverflow.requirement.service.streamelements.StreamElementsConnector._
5+
import org.json.JSONObject
6+
7+
/**
8+
* Listener for websocket events that are emitted by the StreamElements websocket server.
9+
*/
10+
class StreamElementsListener extends EventManager {
11+
12+
def handleEvent(objects: Array[AnyRef]): Unit = {
13+
val json = objects(0).asInstanceOf[JSONObject]
14+
15+
val eventType = json.optString("type")
16+
if (eventType != null) {
17+
val provider = json.getString("provider")
18+
19+
eventType match {
20+
// Youtube's description differs from the usual ones.
21+
// Youtube's subscription is more like a follow of e.g. Twitch or Twitter, it is free for the user,
22+
// Youtube's sponsor is like a Twitch subscription, a paid extra for some perks and
23+
// Youtube's superchat is like a donation/tip, a donation of money that the streamer gets.
24+
// To unify this across all platforms a Youtube sub is a Follow, a Youtube sponsor is a subscription and
25+
// a Youtube superchat is a donation.
26+
case "subscriber" if provider == "youtube" => call(FollowEventJSON(json))
27+
case "sponsor" => call(SubscriptionEventJSON(json))
28+
case "superchat" => call(DonationEventJSON(json))
29+
30+
// Twitch
31+
case "subscriber" => call(SubscriptionEventJSON(json))
32+
case "tip" => call(DonationEventJSON(json))
33+
case "follow" => call(FollowEventJSON(json))
34+
case "cheer" => call(CheerEventJSON(json))
35+
case "host" => call(HostEventJSON(json))
36+
case "raid" => call(RaidEventJSON(json))
37+
case _ =>
38+
}
39+
}
40+
}
41+
}
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
package org.codeoverflow.chatoverflow.requirement.service.streamelements.impl
2+
3+
import java.time.format.DateTimeFormatter
4+
import java.time.{LocalDateTime, OffsetDateTime, ZoneOffset}
5+
import java.util.Currency
6+
7+
import org.codeoverflow.chatoverflow.api.io.dto.User
8+
import org.codeoverflow.chatoverflow.api.io.dto.stat.stream.SubscriptionTier
9+
import org.codeoverflow.chatoverflow.api.io.dto.stat.stream.streamelements._
10+
import org.codeoverflow.chatoverflow.api.io.event.stream.streamelements._
11+
import org.codeoverflow.chatoverflow.api.io.input.event.StreamElementsEventInput
12+
import org.codeoverflow.chatoverflow.registry.Impl
13+
import org.codeoverflow.chatoverflow.requirement.impl.EventInputImpl
14+
import org.codeoverflow.chatoverflow.requirement.service.streamelements.StreamElementsConnector
15+
import org.codeoverflow.chatoverflow.requirement.service.streamelements.StreamElementsConnector._
16+
import org.json.{JSONException, JSONObject}
17+
18+
import scala.reflect.ClassTag
19+
20+
@Impl(impl = classOf[StreamElementsEventInput], connector = classOf[StreamElementsConnector])
21+
class StreamElementsEventInputImpl extends EventInputImpl[StreamElementsEvent, StreamElementsConnector] with StreamElementsEventInput {
22+
23+
override def start(): Boolean = {
24+
sourceConnector.get.registerEventHandler(handleExceptions(onFollow))
25+
sourceConnector.get.registerEventHandler(handleExceptions(onSubscription))
26+
sourceConnector.get.registerEventHandler(handleExceptions(onDonation))
27+
sourceConnector.get.registerEventHandler(handleExceptions(onCheer))
28+
sourceConnector.get.registerEventHandler(handleExceptions(onRaid))
29+
sourceConnector.get.registerEventHandler(handleExceptions(onHost))
30+
true
31+
}
32+
33+
private def handleExceptions[T: ClassTag](handler: T => Unit): T => Unit = event => {
34+
try {
35+
handler(event)
36+
} catch {
37+
case e@(_: JSONException | _: IllegalArgumentException) =>
38+
val jsonClass = implicitly[ClassTag[T]].runtimeClass
39+
logger warn s"Error while parsing follow json of type ${jsonClass.getSimpleName}:"
40+
logger warn s"${e.getClass.getName} - ${e.getMessage}"
41+
}
42+
}
43+
44+
private def onFollow(event: FollowEventJSON): Unit = {
45+
val json = event.json
46+
val data = json.getJSONObject("data")
47+
48+
val follow = new StreamElementsFollow(
49+
parseUser(data),
50+
parseTime(json),
51+
parseProvider(json)
52+
)
53+
call(new StreamElementsFollowEvent(follow))
54+
}
55+
56+
private def onSubscription(event: SubscriptionEventJSON): Unit = {
57+
val json = event.json
58+
val data = json.getJSONObject("data")
59+
60+
val gifted = data.optBoolean("gifted", false)
61+
val sub = new StreamElementsSubscription(
62+
parseUser(data),
63+
parseTime(json),
64+
data.optDouble("amount", 1).toInt,
65+
{
66+
// (judging based on the events from the event simulator that can be seen in the browser console)
67+
// "plan" can be either a string or a number, so we need to handle both cases
68+
val plan = Option(data.opt("plan")).getOrElse("1000").toString
69+
if (plan.toLowerCase == "prime")
70+
SubscriptionTier.PRIME
71+
else
72+
SubscriptionTier.parse(plan.toInt / 1000)
73+
},
74+
gifted,
75+
if (gifted) new User(data.optString("sender")) else null,
76+
parseProvider(json)
77+
)
78+
call(new StreamElementsSubscriptionEvent(sub))
79+
}
80+
81+
private def onDonation(event: DonationEventJSON): Unit = {
82+
val json = event.json
83+
val data = json.getJSONObject("data")
84+
85+
val donation = new StreamElementsDonation(
86+
parseUser(data),
87+
data.getDouble("amount").toFloat,
88+
Currency.getInstance(data.getString("currency")),
89+
parseTime(json),
90+
data.optString("message")
91+
)
92+
call(new StreamElementsDonationEvent(donation))
93+
}
94+
95+
private def onCheer(event: CheerEventJSON): Unit = {
96+
val json = event.json
97+
val data = json.getJSONObject("data")
98+
99+
val cheer = new StreamElementsCheer(
100+
parseUser(data),
101+
data.getInt("amount"),
102+
data.optString("message"),
103+
parseTime(json)
104+
)
105+
call(new StreamElementsCheerEvent(cheer))
106+
}
107+
108+
private def onRaid(event: RaidEventJSON): Unit = {
109+
val json = event.json
110+
val data = json.getJSONObject("data")
111+
112+
val raid = new StreamElementsRaid(
113+
parseUser(data),
114+
data.optString("message"),
115+
data.getInt("amount"),
116+
parseTime(json)
117+
)
118+
call(new StreamElementsRaidEvent(raid))
119+
}
120+
121+
private def onHost(event: HostEventJSON): Unit = {
122+
val json = event.json
123+
val data = json.getJSONObject("data")
124+
125+
val host = new StreamElementsHost(
126+
parseUser(data),
127+
data.optString("message"),
128+
data.getInt("amount"),
129+
parseTime(json)
130+
)
131+
call(new StreamElementsHostEvent(host))
132+
}
133+
134+
override def stop(): Boolean = {
135+
sourceConnector.get.unregisterAllEventListeners
136+
true
137+
}
138+
139+
// Common methods for JSON processing:
140+
141+
private def parseProvider(json: JSONObject): StreamElementsProvider = StreamElementsProvider.parse(json.optString("provider"))
142+
143+
private def parseUser(json: JSONObject): User = {
144+
val username = json.getString("username")
145+
val displayName = json.optString("displayName", username)
146+
new User(username, displayName)
147+
}
148+
149+
private def parseTime(json: JSONObject): OffsetDateTime = {
150+
val utcString = json.getString("createdAt")
151+
LocalDateTime.parse(utcString, DateTimeFormatter.ISO_DATE_TIME).atOffset(ZoneOffset.UTC)
152+
}
153+
}

0 commit comments

Comments
 (0)