server: (router) add stop-timeout option (#18350)
* server: (router) add stop-timeout option * also allow stop while loading * add docs * unload_lru: also wait for unload to complete
This commit is contained in:
+6
-6
@@ -3518,15 +3518,15 @@ void common_params_add_preset_options(std::vector<common_arg> & args) {
|
|||||||
[](common_params &, const std::string &) { /* unused */ }
|
[](common_params &, const std::string &) { /* unused */ }
|
||||||
).set_env(COMMON_ARG_PRESET_LOAD_ON_STARTUP).set_preset_only());
|
).set_env(COMMON_ARG_PRESET_LOAD_ON_STARTUP).set_preset_only());
|
||||||
|
|
||||||
|
args.push_back(common_arg(
|
||||||
|
{"stop-timeout"}, "SECONDS",
|
||||||
|
"in server router mode, force-kill model instance after this many seconds of graceful shutdown",
|
||||||
|
[](common_params &, int) { /* unused */ }
|
||||||
|
).set_env(COMMON_ARG_PRESET_STOP_TIMEOUT).set_preset_only());
|
||||||
|
|
||||||
// args.push_back(common_arg(
|
// args.push_back(common_arg(
|
||||||
// {"pin"},
|
// {"pin"},
|
||||||
// "in server router mode, do not unload this model if models_max is exceeded",
|
// "in server router mode, do not unload this model if models_max is exceeded",
|
||||||
// [](common_params &) { /* unused */ }
|
// [](common_params &) { /* unused */ }
|
||||||
// ).set_preset_only());
|
// ).set_preset_only());
|
||||||
|
|
||||||
// args.push_back(common_arg(
|
|
||||||
// {"unload-idle-seconds"}, "SECONDS",
|
|
||||||
// "in server router mode, unload models idle for more than this many seconds",
|
|
||||||
// [](common_params &, int) { /* unused */ }
|
|
||||||
// ).set_preset_only());
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -10,6 +10,7 @@
|
|||||||
|
|
||||||
// pseudo-env variable to identify preset-only arguments
|
// pseudo-env variable to identify preset-only arguments
|
||||||
#define COMMON_ARG_PRESET_LOAD_ON_STARTUP "__PRESET_LOAD_ON_STARTUP"
|
#define COMMON_ARG_PRESET_LOAD_ON_STARTUP "__PRESET_LOAD_ON_STARTUP"
|
||||||
|
#define COMMON_ARG_PRESET_STOP_TIMEOUT "__PRESET_STOP_TIMEOUT"
|
||||||
|
|
||||||
//
|
//
|
||||||
// CLI argument parsing
|
// CLI argument parsing
|
||||||
|
|||||||
@@ -1486,6 +1486,7 @@ The precedence rule for preset options is as follows:
|
|||||||
|
|
||||||
We also offer additional options that are exclusive to presets (these aren't treated as command-line arguments):
|
We also offer additional options that are exclusive to presets (these aren't treated as command-line arguments):
|
||||||
- `load-on-startup` (boolean): Controls whether the model loads automatically when the server starts
|
- `load-on-startup` (boolean): Controls whether the model loads automatically when the server starts
|
||||||
|
- `stop-timeout` (int, seconds): After requested unload, wait for this many seconds before forcing termination (default: 10)
|
||||||
|
|
||||||
### Routing requests
|
### Routing requests
|
||||||
|
|
||||||
@@ -1574,8 +1575,7 @@ Payload:
|
|||||||
|
|
||||||
```json
|
```json
|
||||||
{
|
{
|
||||||
"model": "ggml-org/gemma-3-4b-it-GGUF:Q4_K_M",
|
"model": "ggml-org/gemma-3-4b-it-GGUF:Q4_K_M"
|
||||||
"extra_args": ["-n", "128", "--top-k", "4"]
|
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|||||||
+117
-55
@@ -34,6 +34,8 @@
|
|||||||
#include <limits.h>
|
#include <limits.h>
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#define DEFAULT_STOP_TIMEOUT 10 // seconds
|
||||||
|
|
||||||
#define CMD_ROUTER_TO_CHILD_EXIT "cmd_router_to_child:exit"
|
#define CMD_ROUTER_TO_CHILD_EXIT "cmd_router_to_child:exit"
|
||||||
#define CMD_CHILD_TO_ROUTER_READY "cmd_child_to_router:ready"
|
#define CMD_CHILD_TO_ROUTER_READY "cmd_child_to_router:ready"
|
||||||
|
|
||||||
@@ -203,13 +205,14 @@ void server_models::load_models() {
|
|||||||
// convert presets to server_model_meta and add to mapping
|
// convert presets to server_model_meta and add to mapping
|
||||||
for (const auto & preset : final_presets) {
|
for (const auto & preset : final_presets) {
|
||||||
server_model_meta meta{
|
server_model_meta meta{
|
||||||
/* preset */ preset.second,
|
/* preset */ preset.second,
|
||||||
/* name */ preset.first,
|
/* name */ preset.first,
|
||||||
/* port */ 0,
|
/* port */ 0,
|
||||||
/* status */ SERVER_MODEL_STATUS_UNLOADED,
|
/* status */ SERVER_MODEL_STATUS_UNLOADED,
|
||||||
/* last_used */ 0,
|
/* last_used */ 0,
|
||||||
/* args */ std::vector<std::string>(),
|
/* args */ std::vector<std::string>(),
|
||||||
/* exit_code */ 0
|
/* exit_code */ 0,
|
||||||
|
/* stop_timeout */ DEFAULT_STOP_TIMEOUT,
|
||||||
};
|
};
|
||||||
add_model(std::move(meta));
|
add_model(std::move(meta));
|
||||||
}
|
}
|
||||||
@@ -227,6 +230,20 @@ void server_models::load_models() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// handle custom stop-timeout option
|
||||||
|
for (auto & [name, inst] : mapping) {
|
||||||
|
std::string val;
|
||||||
|
if (inst.meta.preset.get_option(COMMON_ARG_PRESET_STOP_TIMEOUT, val)) {
|
||||||
|
try {
|
||||||
|
inst.meta.stop_timeout = std::stoi(val);
|
||||||
|
} catch (...) {
|
||||||
|
SRV_WRN("invalid stop-timeout value '%s' for model '%s', using default %d seconds\n",
|
||||||
|
val.c_str(), name.c_str(), DEFAULT_STOP_TIMEOUT);
|
||||||
|
inst.meta.stop_timeout = DEFAULT_STOP_TIMEOUT;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// load any autoload models
|
// load any autoload models
|
||||||
std::vector<std::string> models_to_load;
|
std::vector<std::string> models_to_load;
|
||||||
for (const auto & [name, inst] : mapping) {
|
for (const auto & [name, inst] : mapping) {
|
||||||
@@ -362,7 +379,7 @@ void server_models::unload_lru() {
|
|||||||
int64_t lru_last_used = ggml_time_ms();
|
int64_t lru_last_used = ggml_time_ms();
|
||||||
size_t count_active = 0;
|
size_t count_active = 0;
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lk(mutex);
|
std::unique_lock<std::mutex> lk(mutex);
|
||||||
for (const auto & m : mapping) {
|
for (const auto & m : mapping) {
|
||||||
if (m.second.meta.is_active()) {
|
if (m.second.meta.is_active()) {
|
||||||
count_active++;
|
count_active++;
|
||||||
@@ -376,6 +393,13 @@ void server_models::unload_lru() {
|
|||||||
if (!lru_model_name.empty() && count_active >= (size_t)base_params.models_max) {
|
if (!lru_model_name.empty() && count_active >= (size_t)base_params.models_max) {
|
||||||
SRV_INF("models_max limit reached, removing LRU name=%s\n", lru_model_name.c_str());
|
SRV_INF("models_max limit reached, removing LRU name=%s\n", lru_model_name.c_str());
|
||||||
unload(lru_model_name);
|
unload(lru_model_name);
|
||||||
|
// wait for unload to complete
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lk(mutex);
|
||||||
|
cv.wait(lk, [this, &lru_model_name]() {
|
||||||
|
return mapping[lru_model_name].meta.status == SERVER_MODEL_STATUS_UNLOADED;
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -436,38 +460,83 @@ void server_models::load(const std::string & name) {
|
|||||||
|
|
||||||
// start a thread to manage the child process
|
// start a thread to manage the child process
|
||||||
// captured variables are guaranteed to be destroyed only after the thread is joined
|
// captured variables are guaranteed to be destroyed only after the thread is joined
|
||||||
inst.th = std::thread([this, name, child_proc = inst.subproc, port = inst.meta.port]() {
|
inst.th = std::thread([this, name, child_proc = inst.subproc, port = inst.meta.port, stop_timeout = inst.meta.stop_timeout]() {
|
||||||
// read stdout/stderr and forward to main server log
|
FILE * stdin_file = subprocess_stdin(child_proc.get());
|
||||||
bool state_received = false; // true if child state received
|
FILE * stdout_file = subprocess_stdout(child_proc.get()); // combined stdout/stderr
|
||||||
FILE * p_stdout_stderr = subprocess_stdout(child_proc.get());
|
|
||||||
if (p_stdout_stderr) {
|
std::thread log_thread([&]() {
|
||||||
char buffer[4096];
|
// read stdout/stderr and forward to main server log
|
||||||
while (fgets(buffer, sizeof(buffer), p_stdout_stderr) != nullptr) {
|
// also handle status report from child process
|
||||||
LOG("[%5d] %s", port, buffer);
|
bool state_received = false; // true if child state received
|
||||||
if (!state_received && std::strstr(buffer, CMD_CHILD_TO_ROUTER_READY) != nullptr) {
|
if (stdout_file) {
|
||||||
// child process is ready
|
char buffer[4096];
|
||||||
this->update_status(name, SERVER_MODEL_STATUS_LOADED);
|
while (fgets(buffer, sizeof(buffer), stdout_file) != nullptr) {
|
||||||
state_received = true;
|
LOG("[%5d] %s", port, buffer);
|
||||||
|
if (!state_received && std::strstr(buffer, CMD_CHILD_TO_ROUTER_READY) != nullptr) {
|
||||||
|
// child process is ready
|
||||||
|
this->update_status(name, SERVER_MODEL_STATUS_LOADED, 0);
|
||||||
|
state_received = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
SRV_ERR("failed to get stdout/stderr of child process for name=%s\n", name.c_str());
|
||||||
}
|
}
|
||||||
} else {
|
});
|
||||||
SRV_ERR("failed to get stdout/stderr of child process for name=%s\n", name.c_str());
|
|
||||||
}
|
std::thread stopping_thread([&]() {
|
||||||
|
// thread to monitor stopping signal
|
||||||
|
auto is_stopping = [this, &name]() {
|
||||||
|
return this->stopping_models.find(name) != this->stopping_models.end();
|
||||||
|
};
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lk(this->mutex);
|
||||||
|
this->cv_stop.wait(lk, is_stopping);
|
||||||
|
}
|
||||||
|
SRV_INF("stopping model instance name=%s\n", name.c_str());
|
||||||
|
// send interrupt to child process
|
||||||
|
fprintf(stdin_file, "%s\n", CMD_ROUTER_TO_CHILD_EXIT);
|
||||||
|
fflush(stdin_file);
|
||||||
|
// wait to stop gracefully or timeout
|
||||||
|
int64_t start_time = ggml_time_ms();
|
||||||
|
while (true) {
|
||||||
|
std::unique_lock<std::mutex> lk(this->mutex);
|
||||||
|
if (!is_stopping()) {
|
||||||
|
return; // already stopped
|
||||||
|
}
|
||||||
|
int64_t elapsed = ggml_time_ms() - start_time;
|
||||||
|
if (elapsed >= stop_timeout * 1000) {
|
||||||
|
// timeout, force kill
|
||||||
|
SRV_WRN("force-killing model instance name=%s after %d seconds timeout\n", name.c_str(), stop_timeout);
|
||||||
|
subprocess_terminate(child_proc.get());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this->cv_stop.wait_for(lk, std::chrono::seconds(1));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
// we reach here when the child process exits
|
// we reach here when the child process exits
|
||||||
|
// note: we cannot join() prior to this point because it will close stdin_file
|
||||||
|
if (log_thread.joinable()) {
|
||||||
|
log_thread.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
// stop the timeout monitoring thread
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lk(this->mutex);
|
||||||
|
stopping_models.erase(name);
|
||||||
|
cv_stop.notify_all();
|
||||||
|
}
|
||||||
|
if (stopping_thread.joinable()) {
|
||||||
|
stopping_thread.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
// get the exit code
|
||||||
int exit_code = 0;
|
int exit_code = 0;
|
||||||
subprocess_join(child_proc.get(), &exit_code);
|
subprocess_join(child_proc.get(), &exit_code);
|
||||||
subprocess_destroy(child_proc.get());
|
subprocess_destroy(child_proc.get());
|
||||||
// update PID and status
|
|
||||||
{
|
// update status and exit code
|
||||||
std::lock_guard<std::mutex> lk(mutex);
|
this->update_status(name, SERVER_MODEL_STATUS_UNLOADED, exit_code);
|
||||||
auto it = mapping.find(name);
|
|
||||||
if (it != mapping.end()) {
|
|
||||||
auto & meta = it->second.meta;
|
|
||||||
meta.exit_code = exit_code;
|
|
||||||
meta.status = SERVER_MODEL_STATUS_UNLOADED;
|
|
||||||
}
|
|
||||||
cv.notify_all();
|
|
||||||
}
|
|
||||||
SRV_INF("instance name=%s exited with status %d\n", name.c_str(), exit_code);
|
SRV_INF("instance name=%s exited with status %d\n", name.c_str(), exit_code);
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -488,22 +557,14 @@ void server_models::load(const std::string & name) {
|
|||||||
cv.notify_all();
|
cv.notify_all();
|
||||||
}
|
}
|
||||||
|
|
||||||
static void interrupt_subprocess(FILE * stdin_file) {
|
|
||||||
// because subprocess.h does not provide a way to send SIGINT,
|
|
||||||
// we will send a command to the child process to exit gracefully
|
|
||||||
if (stdin_file) {
|
|
||||||
fprintf(stdin_file, "%s\n", CMD_ROUTER_TO_CHILD_EXIT);
|
|
||||||
fflush(stdin_file);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void server_models::unload(const std::string & name) {
|
void server_models::unload(const std::string & name) {
|
||||||
std::lock_guard<std::mutex> lk(mutex);
|
std::lock_guard<std::mutex> lk(mutex);
|
||||||
auto it = mapping.find(name);
|
auto it = mapping.find(name);
|
||||||
if (it != mapping.end()) {
|
if (it != mapping.end()) {
|
||||||
if (it->second.meta.is_active()) {
|
if (it->second.meta.is_active()) {
|
||||||
SRV_INF("unloading model instance name=%s\n", name.c_str());
|
SRV_INF("unloading model instance name=%s\n", name.c_str());
|
||||||
interrupt_subprocess(it->second.stdin_file);
|
stopping_models.insert(name);
|
||||||
|
cv_stop.notify_all();
|
||||||
// status change will be handled by the managing thread
|
// status change will be handled by the managing thread
|
||||||
} else {
|
} else {
|
||||||
SRV_WRN("model instance name=%s is not loaded\n", name.c_str());
|
SRV_WRN("model instance name=%s is not loaded\n", name.c_str());
|
||||||
@@ -518,7 +579,8 @@ void server_models::unload_all() {
|
|||||||
for (auto & [name, inst] : mapping) {
|
for (auto & [name, inst] : mapping) {
|
||||||
if (inst.meta.is_active()) {
|
if (inst.meta.is_active()) {
|
||||||
SRV_INF("unloading model instance name=%s\n", name.c_str());
|
SRV_INF("unloading model instance name=%s\n", name.c_str());
|
||||||
interrupt_subprocess(inst.stdin_file);
|
stopping_models.insert(name);
|
||||||
|
cv_stop.notify_all();
|
||||||
// status change will be handled by the managing thread
|
// status change will be handled by the managing thread
|
||||||
}
|
}
|
||||||
// moving the thread to join list to avoid deadlock
|
// moving the thread to join list to avoid deadlock
|
||||||
@@ -532,16 +594,15 @@ void server_models::unload_all() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void server_models::update_status(const std::string & name, server_model_status status) {
|
void server_models::update_status(const std::string & name, server_model_status status, int exit_code) {
|
||||||
// for now, we only allow updating to LOADED status
|
std::unique_lock<std::mutex> lk(mutex);
|
||||||
if (status != SERVER_MODEL_STATUS_LOADED) {
|
auto it = mapping.find(name);
|
||||||
throw std::runtime_error("invalid status value");
|
if (it != mapping.end()) {
|
||||||
}
|
auto & meta = it->second.meta;
|
||||||
auto meta = get_meta(name);
|
meta.status = status;
|
||||||
if (meta.has_value()) {
|
meta.exit_code = exit_code;
|
||||||
meta->status = status;
|
|
||||||
update_meta(name, meta.value());
|
|
||||||
}
|
}
|
||||||
|
cv.notify_all();
|
||||||
}
|
}
|
||||||
|
|
||||||
void server_models::wait_until_loaded(const std::string & name) {
|
void server_models::wait_until_loaded(const std::string & name) {
|
||||||
@@ -568,6 +629,7 @@ bool server_models::ensure_model_loaded(const std::string & name) {
|
|||||||
load(name);
|
load(name);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// for loading state
|
||||||
SRV_INF("waiting until model name=%s is fully loaded...\n", name.c_str());
|
SRV_INF("waiting until model name=%s is fully loaded...\n", name.c_str());
|
||||||
wait_until_loaded(name);
|
wait_until_loaded(name);
|
||||||
|
|
||||||
@@ -795,7 +857,7 @@ void server_models_routes::init_routes() {
|
|||||||
res_err(res, format_error_response("model is not found", ERROR_TYPE_INVALID_REQUEST));
|
res_err(res, format_error_response("model is not found", ERROR_TYPE_INVALID_REQUEST));
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
if (model->status != SERVER_MODEL_STATUS_LOADED) {
|
if (!model->is_active()) {
|
||||||
res_err(res, format_error_response("model is not loaded", ERROR_TYPE_INVALID_REQUEST));
|
res_err(res, format_error_response("model is not loaded", ERROR_TYPE_INVALID_REQUEST));
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,6 +9,7 @@
|
|||||||
#include <condition_variable>
|
#include <condition_variable>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
#include <set>
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* state diagram:
|
* state diagram:
|
||||||
@@ -56,6 +57,7 @@ struct server_model_meta {
|
|||||||
int64_t last_used = 0; // for LRU unloading
|
int64_t last_used = 0; // for LRU unloading
|
||||||
std::vector<std::string> args; // args passed to the model instance, will be populated by render_args()
|
std::vector<std::string> args; // args passed to the model instance, will be populated by render_args()
|
||||||
int exit_code = 0; // exit code of the model instance process (only valid if status == FAILED)
|
int exit_code = 0; // exit code of the model instance process (only valid if status == FAILED)
|
||||||
|
int stop_timeout = 0; // seconds to wait before force-killing the model instance during shutdown
|
||||||
|
|
||||||
bool is_active() const {
|
bool is_active() const {
|
||||||
return status == SERVER_MODEL_STATUS_LOADED || status == SERVER_MODEL_STATUS_LOADING;
|
return status == SERVER_MODEL_STATUS_LOADED || status == SERVER_MODEL_STATUS_LOADING;
|
||||||
@@ -83,6 +85,10 @@ private:
|
|||||||
std::condition_variable cv;
|
std::condition_variable cv;
|
||||||
std::map<std::string, instance_t> mapping;
|
std::map<std::string, instance_t> mapping;
|
||||||
|
|
||||||
|
// for stopping models
|
||||||
|
std::condition_variable cv_stop;
|
||||||
|
std::set<std::string> stopping_models;
|
||||||
|
|
||||||
common_preset_context ctx_preset;
|
common_preset_context ctx_preset;
|
||||||
|
|
||||||
common_params base_params;
|
common_params base_params;
|
||||||
@@ -119,7 +125,7 @@ public:
|
|||||||
void unload_all();
|
void unload_all();
|
||||||
|
|
||||||
// update the status of a model instance (thread-safe)
|
// update the status of a model instance (thread-safe)
|
||||||
void update_status(const std::string & name, server_model_status status);
|
void update_status(const std::string & name, server_model_status status, int exit_code);
|
||||||
|
|
||||||
// wait until the model instance is fully loaded (thread-safe)
|
// wait until the model instance is fully loaded (thread-safe)
|
||||||
// return when the model is loaded or failed to load
|
// return when the model is loaded or failed to load
|
||||||
|
|||||||
Reference in New Issue
Block a user