Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.commons.replication

import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.client.node.NodeClient
import org.opensearch.commons.replication.action.StopIndexReplicationRequest
import org.opensearch.commons.replication.action.ReplicationActions.STOP_REPLICATION_ACTION_TYPE
import org.opensearch.commons.utils.recreateObject
import org.opensearch.core.action.ActionListener
import org.opensearch.core.action.ActionResponse
import org.opensearch.core.common.io.stream.Writeable


/**
* All the transport action plugin interfaces for the cross-cluster-replication plugin.
*/
object ReplicationPluginInterface {

/**
* Stop replication.
* @param client Node client for making transport action
* @param request The request object
* @param listener The listener for getting response
*/

fun stopReplication(
client: NodeClient,
request: StopIndexReplicationRequest,
listener: ActionListener<AcknowledgedResponse>
) {
return client.execute(
STOP_REPLICATION_ACTION_TYPE,
request,
wrapActionListener(listener) { response ->
recreateObject(response) {
AcknowledgedResponse(it)
}
}
)
}

/**
* Wrap action listener on concrete response class by a new created one on ActionResponse.
* This is required because the response may be loaded by different classloader across plugins.
* The onResponse(ActionResponse) avoids type cast exception and give a chance to recreate
* the response object.
*/
@Suppress("UNCHECKED_CAST")
private fun <Response : AcknowledgedResponse> wrapActionListener(
listener: ActionListener<Response>,
recreate: (Writeable) -> Response
): ActionListener<Response> {
return object : ActionListener<ActionResponse> {
override fun onResponse(response: ActionResponse) {
val recreated = response as? Response ?: recreate(response)
listener.onResponse(recreated)
}

override fun onFailure(exception: java.lang.Exception) {
listener.onFailure(exception)
}
} as ActionListener<Response>
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.commons.replication.action

import org.opensearch.action.ActionType
import org.opensearch.action.support.master.AcknowledgedResponse

/**
* All the transport action information for the Replication plugin
*/
object ReplicationActions {

/**
* Stop replication. Internal only - Inter plugin communication.
*/
const val STOP_REPLICATION_NAME = "indices:admin/plugins/replication/index/stop"
const val STOP_REPLICATION_BASE_ACTION_NAME = "indices:admin/plugins/replication/index/unfollow"

/**
* Stop replication transport action type. Internal only - Inter plugin communication.
*/
val STOP_REPLICATION_ACTION_TYPE =
ActionType(STOP_REPLICATION_NAME, ::AcknowledgedResponse)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package org.opensearch.commons.replication.action

import org.opensearch.action.ActionRequestValidationException
import org.apache.logging.log4j.LogManager
import org.opensearch.action.IndicesRequest
import org.opensearch.action.support.IndicesOptions
import org.opensearch.action.support.master.AcknowledgedRequest
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.xcontent.*
class StopIndexReplicationRequest : AcknowledgedRequest<StopIndexReplicationRequest>, IndicesRequest.Replaceable, ToXContentObject {
lateinit var indexName: String
constructor(indexName: String) {
this.indexName = indexName
}

private constructor() {
}

constructor(inp: StreamInput): super(inp) {
indexName = inp.readString()
}
companion object {
private val PARSER = ObjectParser<StopIndexReplicationRequest, Void>("StopReplicationRequestParser") {
StopIndexReplicationRequest()
}

fun fromXContent(parser: XContentParser, followerIndex: String): StopIndexReplicationRequest {
val stopIndexReplicationRequest = PARSER.parse(parser, null)
stopIndexReplicationRequest.indexName = followerIndex
return stopIndexReplicationRequest
}
private val log = LogManager.getLogger(StopIndexReplicationRequest::class.java)
}

override fun validate(): ActionRequestValidationException? {
return null
}

override fun indices(vararg indices: String?): IndicesRequest {
return this
}
override fun indices(): Array<String> {
return arrayOf(indexName)
}

override fun indicesOptions(): IndicesOptions {
return IndicesOptions.strictSingleIndexNoExpandForbidClosed()
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
builder.field("indexName", indexName)
builder.endObject()
return builder
}

override fun writeTo(out: StreamOutput) {
super.writeTo(out)
out.writeString(indexName)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.commons.replication

import com.nhaarman.mockitokotlin2.whenever
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import org.mockito.Answers
import org.mockito.ArgumentMatchers
import org.mockito.Mock
import org.mockito.Mockito
import org.mockito.junit.jupiter.MockitoExtension
import org.opensearch.action.ActionType
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.client.node.NodeClient
import org.opensearch.commons.replication.action.StopIndexReplicationRequest
import org.opensearch.core.action.ActionListener

@Suppress("UNCHECKED_CAST")
@ExtendWith(MockitoExtension::class)
internal class ReplicationPluginInterfaceTests {

@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private lateinit var client: NodeClient
@Test
fun stopReplication() {
val request = Mockito.mock(StopIndexReplicationRequest::class.java)
val response = AcknowledgedResponse(true)
val listener: ActionListener<AcknowledgedResponse> =
Mockito.mock(ActionListener::class.java) as ActionListener<AcknowledgedResponse>

Mockito.doAnswer {
(it.getArgument(2) as ActionListener<AcknowledgedResponse>)
.onResponse(response)
}.whenever(client).execute(Mockito.any(ActionType::class.java), Mockito.any(), Mockito.any())

ReplicationPluginInterface.stopReplication(client, request, listener)
Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.commons.replication.action

import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertNotNull
import org.junit.jupiter.api.Assertions.assertNull
import org.junit.jupiter.api.Test
import org.opensearch.commons.utils.recreateObject

internal class StopIndexReplicationRequestTests {
@Test
fun `Stop Replication request serialize and deserialize transport object should be equal`() {
val index = "test-idx"
val request = StopIndexReplicationRequest(index)
val recreatedRequest = recreateObject(request) { StopIndexReplicationRequest(it) }
assertNotNull(recreatedRequest)
assertEquals(request.indexName, recreatedRequest.indexName)
assertNull(recreatedRequest.validate())
}
}