Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ public static boolean isUtf8Terminal() {
}
}

/**
* Create a JSON string containing JVM runtime information
*
* @return JSON String
*/
public static String getJavaInformation() {
var s = "{ \"jvm.system.properties\": {";
s += "\"java.vendor\"" + ":\"" + System.getProperty("java.vendor") + "\", ";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ class ActionStats {
durationMillis: Int,
testCase: TestProfile,
indexTestCase: Int,
indexUserProfile: Int,
queueLength: Int,
tsBegin: String,
tsEnd: String,
Expand All @@ -68,7 +67,6 @@ class ActionStats {
r.testPhase = testPhase

r.testId = indexTestCase
r.indexUserProfile = indexUserProfile
r.queueLength = queueLength

r.numActions = numActions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ data class ActionSummary(
var testName: String = "",
var testPhase: String = "",
var testId: Int = 0,
var indexUserProfile: Int = 0,
var queueLength: Int = 0,
var numActions: Int = 0,
var numSuccess: Int = 0,
Expand All @@ -27,9 +26,6 @@ data class ActionSummary(
var maxWt: Double = 0.0,
var pk: List<Double> = mutableListOf(),
var pv: List<Double> = mutableListOf(),

// var avgCpuSystem: Double = 0.0,
// var avgCpuProcess: Double = 0.0
var processCpuTime: Long = 0,
var processCpuCores: Double = 0.0,
var processCpuUtilization: Double = 0.0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ object DataCollector {
durationMillis,
testCase,
indexTestCase,
indexUserProfile,
queueLength,
tsBegin,
tsEnd,
Expand All @@ -53,7 +52,6 @@ object DataCollector {
durationMillis,
testCase,
indexTestCase,
indexUserProfile,
queueLength,
tsBegin,
tsEnd,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import kotlinx.serialization.json.JsonPrimitive
data class RuntimeContext(
val name: String = "",
val numUsers: Int = 0,
val numUsersActive: Int = 0,
val numThreads: Int = 0,
val userParams: Map<String, JsonPrimitive> = mapOf(),
)
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ data class TestProfile(
//
// This value represents the "L" in Little's Law (equation)
//
val queueLengths: List<Int> = listOf(0),
val numUsersActive: Int = 0,

// List of percentile values to report on.
val percentiles: List<Double> = listOf(50.0, 75.0, 90.0, 95.0, 99.0),
Expand Down
68 changes: 23 additions & 45 deletions tulip-runtime/src/main/kotlin/io/github/wfouche/tulip/core/Tulip.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import java.time.format.DateTimeFormatter
import java.util.concurrent.LinkedBlockingQueue as BlockingQueue
import java.util.concurrent.ThreadLocalRandom
import java.util.concurrent.TimeUnit
import kotlin.math.abs
import kotlin.system.exitProcess
import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
Expand Down Expand Up @@ -207,6 +206,7 @@ private val g_tests = mutableListOf<TestProfile>()
data class ConfigContext(
val enabled: Boolean = false,
@SerialName("num_users") val numUsers: Int = 0,
@SerialName("num_users_active") val numUsersActive: Int = USER_THREAD_QSIZE,
@SerialName("num_threads") val numThreads: Int = 0,
@SerialName("user_params") val userParams: Map<String, JsonPrimitive> = mapOf(),
)
Expand All @@ -229,7 +229,7 @@ data class ConfigTest(
@SerialName("aps_rate") val throughputRate: Double = 0.0,
@SerialName("aps_rate_step_change") val throughputRateStepChange: Double = 0.0,
@SerialName("aps_rate_step_count") val throughputRateStepCount: Int = 1,
@SerialName("worker_thread_queue_size") val workInProgress: Int = 0,
@SerialName("num_users_active") val numUsersActive: Int = 0,
@SerialName("scenario_actions") val actions: List<ConfigAction> = listOf(),
@SerialName("scenario_workflow") val workflow: String = "",
)
Expand Down Expand Up @@ -301,7 +301,7 @@ fun initConfig(text: String): String {
// println("${k}")
val e = entry.value
if (e.enabled) {
val v = RuntimeContext(k, e.numUsers, e.numThreads, e.userParams)
val v = RuntimeContext(k, e.numUsers, e.numUsersActive, e.numThreads, e.userParams)
g_contexts.add(v)
}
}
Expand All @@ -323,7 +323,7 @@ fun initConfig(text: String): String {
arrivalRate = e.throughputRate,
arrivalRateStepChange = e.throughputRateStepChange,
arrivalRateStepCount = e.throughputRateStepCount,
queueLengths = listOf(e.workInProgress),
numUsersActive = e.numUsersActive,
actions =
mutableListOf<Action>().apply {
if (e.workflow.isEmpty()) {
Expand Down Expand Up @@ -392,28 +392,18 @@ val wthread_wait_stats = Histogram(histogramNumberOfSignificantValueDigits)

/*-------------------------------------------------------------------------*/

private fun getQueueLengths(context: RuntimeContext, test: TestProfile): List<Int> {
val list: MutableList<Int> = mutableListOf()
test.queueLengths.forEach { queueLength ->
list.add(
when {
queueLength == 0 ->
if (context.numThreads == 0) context.numUsers * USER_THREAD_QSIZE
else context.numThreads * USER_THREAD_QSIZE
queueLength > 0 ->
if (context.numThreads == 0) context.numUsers * queueLength
else context.numThreads * queueLength // Actions per Thread
else -> abs(queueLength) // Actions across all Threads
}
)
private fun getQueueLengths(context: RuntimeContext, test: TestProfile): Int {
return if (test.numUsersActive != 0) {
test.numUsersActive
} else {
context.numUsersActive
}
return list
}

/*-------------------------------------------------------------------------*/

private fun getTest(context: RuntimeContext, test: TestProfile): TestProfile {
return test.copy(queueLengths = getQueueLengths(context, test))
return test.copy(numUsersActive = getQueueLengths(context, test))
}

/*-------------------------------------------------------------------------*/
Expand All @@ -431,20 +421,12 @@ private fun createActionGenerator(list: List<Int>): Iterator<Int> {

/*-------------------------------------------------------------------------*/

private fun runTest(
testCase: TestProfile,
contextId: Int,
indexTestCase: Int,
indexUserProfile: Int,
queueLength: Int,
) {
private fun runTest(testCase: TestProfile, contextId: Int, indexTestCase: Int, queueLength: Int) {
var cpuTime: Long = 0
var tsBegin = java.time.LocalDateTime.now().format(formatter)
val output = mutableListOf("")
output.add("======================================================================")
output.add(
"= [${contextId}][${indexTestCase}][${indexUserProfile}][${queueLength}] ${testCase.name} - $tsBegin"
)
output.add("= [${contextId}][${indexTestCase}][${queueLength}] ${testCase.name} - $tsBegin")
output.add("======================================================================")
Console.put(output)

Expand Down Expand Up @@ -574,7 +556,7 @@ private fun runTest(
durationMillis,
testCase,
indexTestCase,
indexUserProfile,
0,
queueLength,
tsBegin,
tsEnd,
Expand Down Expand Up @@ -693,7 +675,7 @@ private fun runTest(
durationMillis.toInt(),
testCase,
indexTestCase,
indexUserProfile,
0,
queueLength,
tsBegin,
tsEnd,
Expand All @@ -717,13 +699,7 @@ private fun runTest(

// Pre-warmup
//
// Since we could have 1 or more population set sizes, only perform the
// start-up phase
// on the first set, i.e., with index 0.
//
if (indexUserProfile == 0) {
assignTasks(testCase.duration.startupDurationMillis, "PreWarmup", 0, 0, 0.0)
}
assignTasks(testCase.duration.startupDurationMillis, "PreWarmup", 0, 0, 0.0)

// Warmup
timeMillisEnd = TimeUnit.NANOSECONDS.toMillis(System.nanoTime())
Expand Down Expand Up @@ -781,11 +757,14 @@ private fun runTulip(
Console.put("======================================================================")
Console.put("")
Console.put(" NUM_USERS = $MAX_NUM_USERS")
Console.put(" NUM_USERS_ACTIVE = ${context.numUsersActive}")
if (MAX_NUM_THREADS == 0) {
MAX_NUM_THREADS = MAX_NUM_USERS
Console.put(" NUM_VIRTUAL_THREADS = $MAX_NUM_THREADS")
} else {
Console.put(" NUM_THREADS = $MAX_NUM_THREADS")
Console.put(" NUM_USERS_PER_THREAD = ${MAX_NUM_USERS / MAX_NUM_THREADS}")
}
Console.put(" NUM_THREADS = $MAX_NUM_THREADS")
Console.put(" NUM_USERS_PER_THREAD = ${MAX_NUM_USERS / MAX_NUM_THREADS}")
if ((MAX_NUM_USERS / MAX_NUM_THREADS) * MAX_NUM_THREADS != MAX_NUM_USERS) {
Console.put("")
Console.put("NUM_USERS should equal n*NUM_THREADS, where n >= 1")
Expand All @@ -799,10 +778,9 @@ private fun runTulip(
} else {
g_workflow = workflows[x.workflow]
}
x.queueLengths.forEachIndexed { indexUserProfile, queueLength ->
// Thread.sleep(5000)
runTest(x, contextId, indexTestCase, indexUserProfile, queueLength)
}
val numUsersActive = x.numUsersActive
// Thread.sleep(5000)
runTest(x, contextId, indexTestCase, numUsersActive)
g_workflow = null
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package io.github.wfouche.tulip.core
import io.github.wfouche.tulip.api.TulipUser
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import kotlin.time.Clock
import kotlin.time.Instant
import org.HdrHistogram.IntCountsHistogram

var useVirtualThreads = false
Expand Down Expand Up @@ -117,18 +119,28 @@ fun assignTaskToUser(task: Task) {

fun runtimeDonePT() {
// Terminate all platform threads.
userPlatformThreads!!.forEach { thread -> thread!!.tq.put(Task(status = 999)) }
userPlatformThreads!!.forEach { thread ->
thread!!.tq.clear()
thread.tq.put(Task(status = 999))
}

// Wait for all platform threads to exit.
while (userPlatformThreads!!.map { if (it == null) 0 else 1 }.sum() > 0) {
Thread.sleep(500)
}
}

fun displayTimestamp() {
val now: Instant = Clock.System.now()
println("Current Instant: $now")
}

fun runtimeDoneVT() {
// Terminate all virtual threads.
userObjects!!.forEach { user -> user!!.tq.put(Task(status = 999)) }

userObjects!!.forEach { user ->
user!!.tq.clear()
user.tq.put(Task(status = 999))
}
// Wait for all virtual threads to exit.
userObjects!!.forEach { user -> user!!.future!!.get() }
}
Expand Down