Schedulers

SchedulerService

Just like the Event Listener, we would like to provide a more elegant way to handle these stream-like things called SchedulerService, which allow you to schedule something into the Bukkit's scheduler but control it with the ReactiveX.

To use the scheduler stream, we need to inject the SchedulerService into our component first.

import dev.reactant.reactant.service.spec.server.SchedulerService
@Component
class MySchedulerExample(
private val schedulerService: SchedulerService
) : LifeCycleExample {
override fun onEnable(){
}
}

Then we can call the following functions to get the corresponding streams:

schedulerService.timer(100).subscribe {
MyFirstPlugin.log.info("Run after 100 ticks")
}
schedulerService.interval(100, 10).subscribe {
MyFirstPlugin.log.info("Run after 100 ticks, Run again each 10 ticks")
}
// same as interval(0, 10)
schedulerService.interval(10).subscribe {
MyFirstPlugin.log.info("Run immediately, Run again each 10 ticks")
}
note

The functions of SchedulerService need to create a task on the thread that you called it, therefore you should never call subscribeOn() on these streams, otherwise an exceptions will be thrown.

Cancel a scheduled task

It is so easy to cancel a scheduled task, the stream will automatically cancel the task when the stream being dispose.

val scheduledTask = schedulerService.timer(10)
.subscribe { MyFirstPlugin.log.info("This will never be run") }
scheduledTask.dispose()

Asynchronous task & Back to main thread

Sometimes you may need to run some I/O, networking, or heavy tasks, the server may probably hang if you are running those tasks in the main thread.

The Bukkit way is to create a Asynchronous task, but with ReactiveX, we can switch thread with a easier way.

import io.reactivex.rxjava3.schedulers.Schedulers
// it can be any stream, e.g. event stream, we use interval as an example
val stream = schedulerService.interval(10)
stream
// still on the thread that you called subscribe{}
.doOnNext { MyFirstPlugin.log.info("1: ${Thread.currentThread().name}") }
// now we use observeOn to switch to io threads
.observeOn(Schedulers.io()) // following operations will run on io threads
.map{
// now the thread changed
MyFirstPlugin.log.info("2: ${Thread.currentThread().name}")
// simulate some heavy tasks
Thread.sleep(10000)
// return the calculated result
10
}
// following operations will schedule into bukkit's main thread
.observeOn(schedulerService.mainThreadScheduler)
// Now we back to the main thread
.doOnNext { MyFirstPlugin.log.info("3: ${Thread.currentThread().name}") }
.subscribe {
MyFirstPlugin.log.info("Answer is ${it}! We can safely call Bukkit API now!")
}
caution

You must ensure that the API you called is thread-safe when you are not running at the server main thread. The best way to avoid thread bug is switch back to the main thread once your heavy task was done.