|
| 1 | +package org.codeoverflow.chatoverflow.requirement.service.tipeeestream |
| 2 | + |
| 3 | +import java.util.function.Consumer |
| 4 | + |
| 5 | +import io.socket.client.{IO, Socket} |
| 6 | +import org.codeoverflow.chatoverflow.WithLogger |
| 7 | +import org.codeoverflow.chatoverflow.api.io.dto.event.tipeeestream.{TipeeeStreamDonation, TipeeeStreamEvent, TipeeeStreamFollow, TipeeeStreamSubscription} |
| 8 | +import org.codeoverflow.chatoverflow.connector.Connector |
| 9 | +import org.json.{JSONException, JSONObject} |
| 10 | + |
| 11 | +import scala.collection.mutable.ListBuffer |
| 12 | + |
| 13 | +/** |
| 14 | + * The tipeeestream connector connects to the socket.io service to work with incoming events. |
| 15 | + * |
| 16 | + * @param sourceIdentifier the name of the tipeeestream account |
| 17 | + */ |
| 18 | +class TipeeeStreamConnector(override val sourceIdentifier: String) extends Connector(sourceIdentifier) with WithLogger { |
| 19 | + private val eventHandler = ListBuffer[Consumer[TipeeeStreamEvent]]() |
| 20 | + private val apiKey = "apiKey" |
| 21 | + private val username = "username" |
| 22 | + override protected var requiredCredentialKeys: List[String] = List(apiKey, username) |
| 23 | + override protected var optionalCredentialKeys: List[String] = List() |
| 24 | + private var socket: Socket = _ |
| 25 | + |
| 26 | + /** |
| 27 | + * Starts the connector, e.g. creates a connection with its platform. |
| 28 | + */ |
| 29 | + override def start(): Boolean = { |
| 30 | + socket = IO.socket("https://sso-cf.tipeeestream.com").connect() |
| 31 | + socket.on("connect", (_: Any) => { |
| 32 | + logger info "Connected to TipeeStream Socket.io" |
| 33 | + }) |
| 34 | + socket.emit("join-room", getAuthenticationObject) |
| 35 | + logger info "emitted credentials to TipeeSetream Socket.io api" |
| 36 | + socket.on("new-event", (objects: Array[AnyRef]) => { |
| 37 | + serializeObjectToObject(objects) |
| 38 | + }) |
| 39 | + true |
| 40 | + } |
| 41 | + |
| 42 | + def addIncomingEventHandler(handler: Consumer[TipeeeStreamEvent]): Unit = { |
| 43 | + eventHandler += handler |
| 44 | + } |
| 45 | + |
| 46 | + private def serializeObjectToObject(objects : Array[AnyRef]) : Unit = { |
| 47 | + val json: JSONObject = objects(0).asInstanceOf[JSONObject] |
| 48 | + val event: JSONObject = json.getJSONObject("event") |
| 49 | + val eventType: String = event.getString("type") |
| 50 | + eventType match { |
| 51 | + case "subscription" => |
| 52 | + Subscription(event) |
| 53 | + case "donation" => |
| 54 | + Donation(event) |
| 55 | + case "follow" => |
| 56 | + Follow(event) |
| 57 | + case _ => |
| 58 | + } |
| 59 | + } |
| 60 | + |
| 61 | + @throws[JSONException] |
| 62 | + private def Donation(event: JSONObject): Unit = { |
| 63 | + val parameter = event.getJSONObject("parameters") |
| 64 | + val user = parameter.getString("username") |
| 65 | + val message = parameter.getString("formattedMessage") |
| 66 | + val amount = parameter.getDouble("amount") |
| 67 | + val donation: TipeeeStreamDonation = new TipeeeStreamDonation(null, user, message, amount, null, null) |
| 68 | + eventHandler.foreach(_.accept(donation)) |
| 69 | + } |
| 70 | + |
| 71 | + @throws[JSONException] |
| 72 | + private def Subscription(event: JSONObject): Unit = { |
| 73 | + val parameter = event.getJSONObject("parameters") |
| 74 | + val user = parameter.getString("username") |
| 75 | + val message = parameter.getString("formattedMessage") |
| 76 | + val resub = parameter.getInt("resub") |
| 77 | + val subscription: TipeeeStreamSubscription = new TipeeeStreamSubscription(null, user, message, resub) |
| 78 | + eventHandler.foreach(_.accept(subscription)) |
| 79 | + } |
| 80 | + |
| 81 | + @throws[JSONException] |
| 82 | + private def Follow(event: JSONObject): Unit = { |
| 83 | + val parameter = event.getJSONObject("parameters") |
| 84 | + val user = parameter.getString("username") |
| 85 | + val message = parameter.getString("message") |
| 86 | + val follow: TipeeeStreamFollow = new TipeeeStreamFollow(null, user, message) |
| 87 | + eventHandler.foreach(_.accept(follow)) |
| 88 | + } |
| 89 | + |
| 90 | + /** |
| 91 | + * This stops the activity of the connector, e.g. by closing the platform connection. |
| 92 | + */ |
| 93 | + override def stop(): Boolean = { |
| 94 | + socket.close() |
| 95 | + true |
| 96 | + } |
| 97 | + |
| 98 | + private def getAuthenticationObject: JSONObject = { |
| 99 | + val obj = new JSONObject() |
| 100 | + obj.put("room", credentials.get.getValue(apiKey).get) |
| 101 | + obj.put("username", credentials.get.getValue(username).get) |
| 102 | + obj |
| 103 | + } |
| 104 | +} |
0 commit comments