test: harden scheduler tests (#12662)

* test: harden scheduler tests

This removes reschedDelay which was stale code, and adds
a new configurable timeout for the waitForVRAMRecovery so
tests can now set the timeout to be very short to avoid the
scheduler getting stuck and hitting a test timeout.

* test: tune tests for partial loads

Give stress tests more time when the model is split between CPU/GPU
This commit is contained in:
Daniel Hiltgen 2025-10-17 08:56:44 -07:00 committed by GitHub
parent 270679932f
commit 68e04c7ff8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 195 additions and 143 deletions

View File

@ -109,6 +109,8 @@ func TestMultiModelStress(t *testing.T) {
defer cancel() defer cancel()
client, _, cleanup := InitServerConnection(ctx, t) client, _, cleanup := InitServerConnection(ctx, t)
defer cleanup() defer cleanup()
initialTimeout := 120 * time.Second
streamTimeout := 20 * time.Second
// Make sure all the models are pulled before we get started // Make sure all the models are pulled before we get started
for _, model := range chosenModels { for _, model := range chosenModels {
@ -147,6 +149,8 @@ chooseModels:
for _, m := range models.Models { for _, m := range models.Models {
if m.SizeVRAM == 0 { if m.SizeVRAM == 0 {
slog.Info("model running on CPU", "name", m.Name, "target", targetLoadCount, "chosen", chosenModels[:targetLoadCount]) slog.Info("model running on CPU", "name", m.Name, "target", targetLoadCount, "chosen", chosenModels[:targetLoadCount])
initialTimeout = 240 * time.Second
streamTimeout = 30 * time.Second
break chooseModels break chooseModels
} }
} }
@ -172,10 +176,7 @@ chooseModels:
k := r.Int() % len(reqs) k := r.Int() % len(reqs)
reqs[k].Model = chosenModels[i] reqs[k].Model = chosenModels[i]
slog.Info("Starting", "model", reqs[k].Model, "iteration", j, "request", reqs[k].Messages[0].Content) slog.Info("Starting", "model", reqs[k].Model, "iteration", j, "request", reqs[k].Messages[0].Content)
DoChat(ctx, t, client, reqs[k], resps[k], DoChat(ctx, t, client, reqs[k], resps[k], initialTimeout, streamTimeout)
120*time.Second, // Be extra patient for the model to load initially
10*time.Second, // Once results start streaming, fail if they stall
)
} }
}(i) }(i)
} }

View File

@ -78,7 +78,7 @@ func TestContextExhaustion(t *testing.T) {
// Send multiple generate requests with prior context and ensure the response is coherant and expected // Send multiple generate requests with prior context and ensure the response is coherant and expected
func TestParallelGenerateWithHistory(t *testing.T) { func TestParallelGenerateWithHistory(t *testing.T) {
modelOverride := "gpt-oss:20b" modelName := "gpt-oss:20b"
req, resp := GenerateRequests() req, resp := GenerateRequests()
numParallel := 2 numParallel := 2
iterLimit := 2 iterLimit := 2
@ -88,15 +88,23 @@ func TestParallelGenerateWithHistory(t *testing.T) {
defer cancel() defer cancel()
client, _, cleanup := InitServerConnection(ctx, t) client, _, cleanup := InitServerConnection(ctx, t)
defer cleanup() defer cleanup()
initialTimeout := 120 * time.Second
streamTimeout := 20 * time.Second
// Get the server running (if applicable) warm the model up with a single initial request // Get the server running (if applicable) warm the model up with a single initial request
slog.Info("loading", "model", modelOverride) slog.Info("loading", "model", modelName)
err := client.Generate(ctx, err := client.Generate(ctx,
&api.GenerateRequest{Model: modelOverride, KeepAlive: &api.Duration{Duration: 10 * time.Second}}, &api.GenerateRequest{Model: modelName, KeepAlive: &api.Duration{Duration: 10 * time.Second}},
func(response api.GenerateResponse) error { return nil }, func(response api.GenerateResponse) error { return nil },
) )
if err != nil { if err != nil {
t.Fatalf("failed to load model %s: %s", modelOverride, err) t.Fatalf("failed to load model %s: %s", modelName, err)
}
gpuPercent := getGPUPercent(ctx, t, client, modelName)
if gpuPercent < 80 {
slog.Warn("Low GPU percentage - increasing timeouts", "percent", gpuPercent)
initialTimeout = 240 * time.Second
streamTimeout = 30 * time.Second
} }
var wg sync.WaitGroup var wg sync.WaitGroup
@ -105,7 +113,7 @@ func TestParallelGenerateWithHistory(t *testing.T) {
go func(i int) { go func(i int) {
defer wg.Done() defer wg.Done()
k := i % len(req) k := i % len(req)
req[k].Model = modelOverride req[k].Model = modelName
for j := 0; j < iterLimit; j++ { for j := 0; j < iterLimit; j++ {
if time.Now().Sub(started) > softTimeout { if time.Now().Sub(started) > softTimeout {
slog.Info("exceeded soft timeout, winding down test") slog.Info("exceeded soft timeout, winding down test")
@ -114,7 +122,7 @@ func TestParallelGenerateWithHistory(t *testing.T) {
slog.Info("Starting", "thread", i, "iter", j) slog.Info("Starting", "thread", i, "iter", j)
// On slower GPUs it can take a while to process the concurrent requests // On slower GPUs it can take a while to process the concurrent requests
// so we allow a much longer initial timeout // so we allow a much longer initial timeout
c := DoGenerate(ctx, t, client, req[k], resp[k], 120*time.Second, 20*time.Second) c := DoGenerate(ctx, t, client, req[k], resp[k], initialTimeout, streamTimeout)
req[k].Context = c req[k].Context = c
req[k].Prompt = "tell me more!" req[k].Prompt = "tell me more!"
} }
@ -165,7 +173,7 @@ func TestGenerateWithHistory(t *testing.T) {
// Send multiple chat requests with prior context and ensure the response is coherant and expected // Send multiple chat requests with prior context and ensure the response is coherant and expected
func TestParallelChatWithHistory(t *testing.T) { func TestParallelChatWithHistory(t *testing.T) {
modelOverride := "gpt-oss:20b" modelName := "gpt-oss:20b"
req, resp := ChatRequests() req, resp := ChatRequests()
numParallel := 2 numParallel := 2
iterLimit := 2 iterLimit := 2
@ -175,15 +183,23 @@ func TestParallelChatWithHistory(t *testing.T) {
defer cancel() defer cancel()
client, _, cleanup := InitServerConnection(ctx, t) client, _, cleanup := InitServerConnection(ctx, t)
defer cleanup() defer cleanup()
initialTimeout := 120 * time.Second
streamTimeout := 20 * time.Second
// Get the server running (if applicable) warm the model up with a single initial empty request // Get the server running (if applicable) warm the model up with a single initial empty request
slog.Info("loading", "model", modelOverride) slog.Info("loading", "model", modelName)
err := client.Generate(ctx, err := client.Generate(ctx,
&api.GenerateRequest{Model: modelOverride, KeepAlive: &api.Duration{Duration: 10 * time.Second}}, &api.GenerateRequest{Model: modelName, KeepAlive: &api.Duration{Duration: 10 * time.Second}},
func(response api.GenerateResponse) error { return nil }, func(response api.GenerateResponse) error { return nil },
) )
if err != nil { if err != nil {
t.Fatalf("failed to load model %s: %s", modelOverride, err) t.Fatalf("failed to load model %s: %s", modelName, err)
}
gpuPercent := getGPUPercent(ctx, t, client, modelName)
if gpuPercent < 80 {
slog.Warn("Low GPU percentage - increasing timeouts", "percent", gpuPercent)
initialTimeout = 240 * time.Second
streamTimeout = 30 * time.Second
} }
var wg sync.WaitGroup var wg sync.WaitGroup
@ -192,7 +208,7 @@ func TestParallelChatWithHistory(t *testing.T) {
go func(i int) { go func(i int) {
defer wg.Done() defer wg.Done()
k := i % len(req) k := i % len(req)
req[k].Model = modelOverride req[k].Model = modelName
for j := 0; j < iterLimit; j++ { for j := 0; j < iterLimit; j++ {
if time.Now().Sub(started) > softTimeout { if time.Now().Sub(started) > softTimeout {
slog.Info("exceeded soft timeout, winding down test") slog.Info("exceeded soft timeout, winding down test")
@ -201,7 +217,7 @@ func TestParallelChatWithHistory(t *testing.T) {
slog.Info("Starting", "thread", i, "iter", j) slog.Info("Starting", "thread", i, "iter", j)
// On slower GPUs it can take a while to process the concurrent requests // On slower GPUs it can take a while to process the concurrent requests
// so we allow a much longer initial timeout // so we allow a much longer initial timeout
assistant := DoChat(ctx, t, client, req[k], resp[k], 120*time.Second, 20*time.Second) assistant := DoChat(ctx, t, client, req[k], resp[k], initialTimeout, streamTimeout)
if assistant == nil { if assistant == nil {
t.Fatalf("didn't get an assistant response for context") t.Fatalf("didn't get an assistant response for context")
} }

View File

@ -65,6 +65,23 @@ func TestModelsChat(t *testing.T) {
} }
} }
} }
initialTimeout := 120 * time.Second
streamTimeout := 30 * time.Second
slog.Info("loading", "model", model)
err := client.Generate(ctx,
&api.GenerateRequest{Model: model, KeepAlive: &api.Duration{Duration: 10 * time.Second}},
func(response api.GenerateResponse) error { return nil },
)
if err != nil {
t.Fatalf("failed to load model %s: %s", model, err)
}
gpuPercent := getGPUPercent(ctx, t, client, model)
if gpuPercent < 80 {
slog.Warn("Low GPU percentage - increasing timeouts", "percent", gpuPercent)
initialTimeout = 240 * time.Second
streamTimeout = 40 * time.Second
}
// TODO - fiddle with context size // TODO - fiddle with context size
req := api.ChatRequest{ req := api.ChatRequest{
Model: model, Model: model,
@ -80,7 +97,7 @@ func TestModelsChat(t *testing.T) {
"seed": 123, "seed": 123,
}, },
} }
DoChat(ctx, t, client, req, blueSkyExpected, 120*time.Second, 30*time.Second) DoChat(ctx, t, client, req, blueSkyExpected, initialTimeout, streamTimeout)
// best effort unload once we're done with the model // best effort unload once we're done with the model
client.Generate(ctx, &api.GenerateRequest{Model: req.Model, KeepAlive: &api.Duration{Duration: 0}}, func(rsp api.GenerateResponse) error { return nil }) client.Generate(ctx, &api.GenerateRequest{Model: req.Model, KeepAlive: &api.Duration{Duration: 0}}, func(rsp api.GenerateResponse) error { return nil })
}) })

View File

@ -743,6 +743,13 @@ func skipUnderMinVRAM(t *testing.T, gb uint64) {
// Skip if the target model isn't X% GPU loaded to avoid excessive runtime // Skip if the target model isn't X% GPU loaded to avoid excessive runtime
func skipIfNotGPULoaded(ctx context.Context, t *testing.T, client *api.Client, model string, minPercent int) { func skipIfNotGPULoaded(ctx context.Context, t *testing.T, client *api.Client, model string, minPercent int) {
gpuPercent := getGPUPercent(ctx, t, client, model)
if gpuPercent < minPercent {
t.Skip(fmt.Sprintf("test requires minimum %d%% GPU load, but model %s only has %d%%", minPercent, model, gpuPercent))
}
}
func getGPUPercent(ctx context.Context, t *testing.T, client *api.Client, model string) int {
models, err := client.ListRunning(ctx) models, err := client.ListRunning(ctx)
if err != nil { if err != nil {
t.Fatalf("failed to list running models: %s", err) t.Fatalf("failed to list running models: %s", err)
@ -772,12 +779,10 @@ func skipIfNotGPULoaded(ctx context.Context, t *testing.T, client *api.Client, m
cpuPercent := math.Round(float64(sizeCPU) / float64(m.Size) * 110) cpuPercent := math.Round(float64(sizeCPU) / float64(m.Size) * 110)
gpuPercent = int(100 - cpuPercent) gpuPercent = int(100 - cpuPercent)
} }
if gpuPercent < minPercent { return gpuPercent
t.Skip(fmt.Sprintf("test requires minimum %d%% GPU load, but model %s only has %d%%", minPercent, model, gpuPercent))
} }
return t.Fatalf("model %s not loaded - actually loaded: %v", model, loaded)
} return 0
t.Skip(fmt.Sprintf("model %s not loaded - actually loaded: %v", model, loaded))
} }
func getTimeouts(t *testing.T) (soft time.Duration, hard time.Duration) { func getTimeouts(t *testing.T) (soft time.Duration, hard time.Duration) {

View File

@ -38,7 +38,7 @@ func TestGenerateDebugRenderOnly(t *testing.T) {
newServerFn: newMockServer(&mock), newServerFn: newMockServer(&mock),
getGpuFn: getGpuFn, getGpuFn: getGpuFn,
getCpuFn: getCpuFn, getCpuFn: getCpuFn,
reschedDelay: 250 * time.Millisecond, waitForRecovery: 250 * time.Millisecond,
loadFn: func(req *LlmRequest, _ *ggml.GGML, _ discover.GpuInfoList, _ bool) bool { loadFn: func(req *LlmRequest, _ *ggml.GGML, _ discover.GpuInfoList, _ bool) bool {
// add small delay to simulate loading // add small delay to simulate loading
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)
@ -231,7 +231,7 @@ func TestChatDebugRenderOnly(t *testing.T) {
newServerFn: newMockServer(&mock), newServerFn: newMockServer(&mock),
getGpuFn: getGpuFn, getGpuFn: getGpuFn,
getCpuFn: getCpuFn, getCpuFn: getCpuFn,
reschedDelay: 250 * time.Millisecond, waitForRecovery: 250 * time.Millisecond,
loadFn: func(req *LlmRequest, _ *ggml.GGML, _ discover.GpuInfoList, _ bool) bool { loadFn: func(req *LlmRequest, _ *ggml.GGML, _ discover.GpuInfoList, _ bool) bool {
// add small delay to simulate loading // add small delay to simulate loading
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)

View File

@ -43,7 +43,7 @@ func TestGenerateWithBuiltinRenderer(t *testing.T) {
newServerFn: newMockServer(&mock), newServerFn: newMockServer(&mock),
getGpuFn: getGpuFn, getGpuFn: getGpuFn,
getCpuFn: getCpuFn, getCpuFn: getCpuFn,
reschedDelay: 250 * time.Millisecond, waitForRecovery: 250 * time.Millisecond,
loadFn: func(req *LlmRequest, _ *ggml.GGML, _ discover.GpuInfoList, _ bool) bool { loadFn: func(req *LlmRequest, _ *ggml.GGML, _ discover.GpuInfoList, _ bool) bool {
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)
req.successCh <- &runnerRef{ req.successCh <- &runnerRef{
@ -227,7 +227,7 @@ func TestGenerateWithDebugRenderOnly(t *testing.T) {
newServerFn: newMockServer(&mock), newServerFn: newMockServer(&mock),
getGpuFn: getGpuFn, getGpuFn: getGpuFn,
getCpuFn: getCpuFn, getCpuFn: getCpuFn,
reschedDelay: 250 * time.Millisecond, waitForRecovery: 250 * time.Millisecond,
loadFn: func(req *LlmRequest, _ *ggml.GGML, _ discover.GpuInfoList, _ bool) bool { loadFn: func(req *LlmRequest, _ *ggml.GGML, _ discover.GpuInfoList, _ bool) bool {
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)
req.successCh <- &runnerRef{ req.successCh <- &runnerRef{

View File

@ -76,7 +76,7 @@ func TestGenerateChat(t *testing.T) {
newServerFn: newMockServer(&mock), newServerFn: newMockServer(&mock),
getGpuFn: getGpuFn, getGpuFn: getGpuFn,
getCpuFn: getCpuFn, getCpuFn: getCpuFn,
reschedDelay: 250 * time.Millisecond, waitForRecovery: 250 * time.Millisecond,
loadFn: func(req *LlmRequest, _ *ggml.GGML, _ discover.GpuInfoList, _ bool) bool { loadFn: func(req *LlmRequest, _ *ggml.GGML, _ discover.GpuInfoList, _ bool) bool {
// add small delay to simulate loading // add small delay to simulate loading
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)
@ -687,7 +687,7 @@ func TestGenerate(t *testing.T) {
newServerFn: newMockServer(&mock), newServerFn: newMockServer(&mock),
getGpuFn: getGpuFn, getGpuFn: getGpuFn,
getCpuFn: getCpuFn, getCpuFn: getCpuFn,
reschedDelay: 250 * time.Millisecond, waitForRecovery: 250 * time.Millisecond,
loadFn: func(req *LlmRequest, _ *ggml.GGML, _ discover.GpuInfoList, _ bool) bool { loadFn: func(req *LlmRequest, _ *ggml.GGML, _ discover.GpuInfoList, _ bool) bool {
// add small delay to simulate loading // add small delay to simulate loading
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)
@ -1112,7 +1112,7 @@ func TestChatWithPromptEndingInThinkTag(t *testing.T) {
newServerFn: newMockServer(mock), newServerFn: newMockServer(mock),
getGpuFn: getGpuFn, getGpuFn: getGpuFn,
getCpuFn: getCpuFn, getCpuFn: getCpuFn,
reschedDelay: 250 * time.Millisecond, waitForRecovery: 250 * time.Millisecond,
loadFn: func(req *LlmRequest, _ *ggml.GGML, _ discover.GpuInfoList, _ bool) bool { loadFn: func(req *LlmRequest, _ *ggml.GGML, _ discover.GpuInfoList, _ bool) bool {
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)
req.successCh <- &runnerRef{llama: mock} req.successCh <- &runnerRef{llama: mock}

View File

@ -276,7 +276,7 @@ func TestChatHarmonyParserStreamingRealtime(t *testing.T) {
newServerFn: newMockServer(&mock), newServerFn: newMockServer(&mock),
getGpuFn: getGpuFn, getGpuFn: getGpuFn,
getCpuFn: getCpuFn, getCpuFn: getCpuFn,
reschedDelay: 100 * time.Millisecond, waitForRecovery: 100 * time.Millisecond,
loadFn: func(req *LlmRequest, _ *ggml.GGML, _ discover.GpuInfoList, _ bool) bool { loadFn: func(req *LlmRequest, _ *ggml.GGML, _ discover.GpuInfoList, _ bool) bool {
req.successCh <- &runnerRef{ req.successCh <- &runnerRef{
llama: &mock, llama: &mock,
@ -427,7 +427,7 @@ func TestChatHarmonyParserStreamingSimple(t *testing.T) {
newServerFn: newMockServer(&mock), newServerFn: newMockServer(&mock),
getGpuFn: getGpuFn, getGpuFn: getGpuFn,
getCpuFn: getCpuFn, getCpuFn: getCpuFn,
reschedDelay: 100 * time.Millisecond, waitForRecovery: 100 * time.Millisecond,
loadFn: func(req *LlmRequest, _ *ggml.GGML, _ discover.GpuInfoList, _ bool) bool { loadFn: func(req *LlmRequest, _ *ggml.GGML, _ discover.GpuInfoList, _ bool) bool {
req.successCh <- &runnerRef{ req.successCh <- &runnerRef{
llama: &mock, llama: &mock,
@ -609,7 +609,7 @@ func TestChatHarmonyParserStreaming(t *testing.T) {
newServerFn: newMockServer(&mock), newServerFn: newMockServer(&mock),
getGpuFn: getGpuFn, getGpuFn: getGpuFn,
getCpuFn: getCpuFn, getCpuFn: getCpuFn,
reschedDelay: 250 * time.Millisecond, waitForRecovery: 250 * time.Millisecond,
loadFn: func(req *LlmRequest, _ *ggml.GGML, _ discover.GpuInfoList, _ bool) bool { loadFn: func(req *LlmRequest, _ *ggml.GGML, _ discover.GpuInfoList, _ bool) bool {
req.successCh <- &runnerRef{ req.successCh <- &runnerRef{
llama: &mock, llama: &mock,

View File

@ -56,7 +56,9 @@ type Scheduler struct {
newServerFn func(gpus discover.GpuInfoList, model string, f *ggml.GGML, adapters []string, projectors []string, opts api.Options, numParallel int) (llm.LlamaServer, error) newServerFn func(gpus discover.GpuInfoList, model string, f *ggml.GGML, adapters []string, projectors []string, opts api.Options, numParallel int) (llm.LlamaServer, error)
getGpuFn func(ctx context.Context, runners []discover.FilteredRunnerDiscovery) discover.GpuInfoList getGpuFn func(ctx context.Context, runners []discover.FilteredRunnerDiscovery) discover.GpuInfoList
getCpuFn func() discover.GpuInfo getCpuFn func() discover.GpuInfo
reschedDelay time.Duration
// waitForRecovery sets the limit for how long to wait for memory usage to recover after unload before scheduling the next model
waitForRecovery time.Duration
} }
// Default automatic value for number of models we allow per GPU // Default automatic value for number of models we allow per GPU
@ -77,7 +79,7 @@ func InitScheduler(ctx context.Context) *Scheduler {
newServerFn: llm.NewLlamaServer, newServerFn: llm.NewLlamaServer,
getGpuFn: discover.GetGPUInfo, getGpuFn: discover.GetGPUInfo,
getCpuFn: discover.GetCPUInfo, getCpuFn: discover.GetCPUInfo,
reschedDelay: 250 * time.Millisecond, waitForRecovery: 5 * time.Second,
} }
sched.loadFn = sched.load sched.loadFn = sched.load
return sched return sched
@ -650,8 +652,8 @@ func (s *Scheduler) waitForVRAMRecovery(runner *runnerRef, runners []discover.Fi
freeMemoryNow := freeMemoryBefore freeMemoryNow := freeMemoryBefore
go func() { go func() {
// typical convergence is 0.5-1.5s - If it takes more than 5 seconds to discover and converge, let the scheduler estimate VRAM usage // typical convergence is 0.5-1.5s - If it takes too long to discover and converge, let the scheduler estimate VRAM usage
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), s.waitForRecovery)
defer cancel() defer cancel()
ticker := time.NewTicker(250 * time.Millisecond) ticker := time.NewTicker(250 * time.Millisecond)
defer ticker.Stop() defer ticker.Stop()

View File

@ -26,7 +26,7 @@ func TestMain(m *testing.M) {
os.Exit(m.Run()) os.Exit(m.Run())
} }
func TestInitScheduler(t *testing.T) { func TestSchedInit(t *testing.T) {
ctx, done := context.WithCancel(t.Context()) ctx, done := context.WithCancel(t.Context())
defer done() defer done()
s := InitScheduler(ctx) s := InitScheduler(ctx)
@ -35,10 +35,11 @@ func TestInitScheduler(t *testing.T) {
s.loadedMu.Unlock() s.loadedMu.Unlock()
} }
func TestLoad(t *testing.T) { func TestSchedLoad(t *testing.T) {
ctx, done := context.WithTimeout(t.Context(), 20*time.Millisecond) ctx, done := context.WithTimeout(t.Context(), 20*time.Millisecond)
defer done() defer done()
s := InitScheduler(ctx) s := InitScheduler(ctx)
s.waitForRecovery = 10 * time.Millisecond
var f *ggml.GGML // value not used in tests var f *ggml.GGML // value not used in tests
req := &LlmRequest{ req := &LlmRequest{
ctx: ctx, ctx: ctx,
@ -167,10 +168,11 @@ func getCpuFn() discover.GpuInfo {
return g return g
} }
func TestRequestsSameModelSameRequest(t *testing.T) { func TestSchedRequestsSameModelSameRequest(t *testing.T) {
ctx, done := context.WithTimeout(t.Context(), 500*time.Millisecond) ctx, done := context.WithTimeout(t.Context(), 500*time.Millisecond)
defer done() defer done()
s := InitScheduler(ctx) s := InitScheduler(ctx)
s.waitForRecovery = 10 * time.Millisecond
s.getGpuFn = getGpuFn s.getGpuFn = getGpuFn
s.getCpuFn = getCpuFn s.getCpuFn = getCpuFn
a := newScenarioRequest(t, ctx, "ollama-model-1", 10, &api.Duration{Duration: 5 * time.Millisecond}, nil) a := newScenarioRequest(t, ctx, "ollama-model-1", 10, &api.Duration{Duration: 5 * time.Millisecond}, nil)
@ -210,10 +212,11 @@ func TestRequestsSameModelSameRequest(t *testing.T) {
} }
} }
func TestRequestsSimpleReloadSameModel(t *testing.T) { func TestSchedRequestsSimpleReloadSameModel(t *testing.T) {
ctx, done := context.WithTimeout(t.Context(), 5000*time.Millisecond) ctx, done := context.WithTimeout(t.Context(), 5000*time.Millisecond)
defer done() defer done()
s := InitScheduler(ctx) s := InitScheduler(ctx)
s.waitForRecovery = 10 * time.Millisecond
s.getGpuFn = getGpuFn s.getGpuFn = getGpuFn
s.getCpuFn = getCpuFn s.getCpuFn = getCpuFn
a := newScenarioRequest(t, ctx, "ollama-model-1", 10, &api.Duration{Duration: 5 * time.Millisecond}, nil) a := newScenarioRequest(t, ctx, "ollama-model-1", 10, &api.Duration{Duration: 5 * time.Millisecond}, nil)
@ -267,10 +270,11 @@ func TestRequestsSimpleReloadSameModel(t *testing.T) {
} }
} }
func TestRequestsMultipleLoadedModels(t *testing.T) { func TestSchedRequestsMultipleLoadedModels(t *testing.T) {
ctx, done := context.WithTimeout(t.Context(), 500*time.Millisecond) ctx, done := context.WithTimeout(t.Context(), 500*time.Millisecond)
defer done() defer done()
s := InitScheduler(ctx) s := InitScheduler(ctx)
s.waitForRecovery = 10 * time.Millisecond
s.getGpuFn = getGpuFn // 1 metal GPU s.getGpuFn = getGpuFn // 1 metal GPU
s.getCpuFn = getCpuFn // 1 CPU s.getCpuFn = getCpuFn // 1 CPU
@ -389,7 +393,7 @@ closeWait:
s.loadedMu.Unlock() s.loadedMu.Unlock()
} }
func TestGetRunner(t *testing.T) { func TestSchedGetRunner(t *testing.T) {
ctx, done := context.WithTimeout(t.Context(), 3*time.Second) ctx, done := context.WithTimeout(t.Context(), 3*time.Second)
defer done() defer done()
@ -398,6 +402,7 @@ func TestGetRunner(t *testing.T) {
c := newScenarioRequest(t, ctx, "ollama-model-1c", 10, &api.Duration{Duration: 2 * time.Millisecond}, nil) c := newScenarioRequest(t, ctx, "ollama-model-1c", 10, &api.Duration{Duration: 2 * time.Millisecond}, nil)
t.Setenv("OLLAMA_MAX_QUEUE", "1") t.Setenv("OLLAMA_MAX_QUEUE", "1")
s := InitScheduler(ctx) s := InitScheduler(ctx)
s.waitForRecovery = 10 * time.Millisecond
s.getGpuFn = getGpuFn s.getGpuFn = getGpuFn
s.getCpuFn = getCpuFn s.getCpuFn = getCpuFn
s.newServerFn = a.newServer s.newServerFn = a.newServer
@ -442,10 +447,11 @@ func TestGetRunner(t *testing.T) {
b.ctxDone() b.ctxDone()
} }
func TestExpireRunner(t *testing.T) { func TestSchedExpireRunner(t *testing.T) {
ctx, done := context.WithTimeout(t.Context(), 20*time.Millisecond) ctx, done := context.WithTimeout(t.Context(), 20*time.Millisecond)
defer done() defer done()
s := InitScheduler(ctx) s := InitScheduler(ctx)
s.waitForRecovery = 10 * time.Millisecond
req := &LlmRequest{ req := &LlmRequest{
ctx: ctx, ctx: ctx,
model: &Model{ModelPath: "foo"}, model: &Model{ModelPath: "foo"},
@ -490,13 +496,14 @@ func TestExpireRunner(t *testing.T) {
} }
// TODO - add one scenario that triggers the bogus finished event with positive ref count // TODO - add one scenario that triggers the bogus finished event with positive ref count
func TestPrematureExpired(t *testing.T) { func TestSchedPrematureExpired(t *testing.T) {
ctx, done := context.WithTimeout(t.Context(), 500*time.Millisecond) ctx, done := context.WithTimeout(t.Context(), 500*time.Millisecond)
defer done() defer done()
// Same model, same request // Same model, same request
scenario1a := newScenarioRequest(t, ctx, "ollama-model-1a", 10, nil, nil) scenario1a := newScenarioRequest(t, ctx, "ollama-model-1a", 10, nil, nil)
s := InitScheduler(ctx) s := InitScheduler(ctx)
s.waitForRecovery = 10 * time.Millisecond
s.getGpuFn = func(ctx context.Context, runners []discover.FilteredRunnerDiscovery) discover.GpuInfoList { s.getGpuFn = func(ctx context.Context, runners []discover.FilteredRunnerDiscovery) discover.GpuInfoList {
g := discover.GpuInfo{DeviceID: ml.DeviceID{Library: "metal"}} g := discover.GpuInfo{DeviceID: ml.DeviceID{Library: "metal"}}
g.TotalMemory = 24 * format.GigaByte g.TotalMemory = 24 * format.GigaByte
@ -537,7 +544,7 @@ func TestPrematureExpired(t *testing.T) {
time.Sleep(5 * time.Millisecond) time.Sleep(5 * time.Millisecond)
} }
func TestUseLoadedRunner(t *testing.T) { func TestSchedUseLoadedRunner(t *testing.T) {
ctx, done := context.WithTimeout(t.Context(), 100*time.Millisecond) ctx, done := context.WithTimeout(t.Context(), 100*time.Millisecond)
req := &LlmRequest{ req := &LlmRequest{
ctx: ctx, ctx: ctx,
@ -564,7 +571,7 @@ func TestUseLoadedRunner(t *testing.T) {
require.Equal(t, req, fin) require.Equal(t, req, fin)
} }
func TestUpdateFreeSpace(t *testing.T) { func TestSchedUpdateFreeSpace(t *testing.T) {
ctx, done := context.WithTimeout(t.Context(), 100*time.Millisecond) ctx, done := context.WithTimeout(t.Context(), 100*time.Millisecond)
defer done() defer done()
gpus := discover.GpuInfoList{ gpus := discover.GpuInfoList{
@ -597,6 +604,7 @@ func TestUpdateFreeSpace(t *testing.T) {
r2 := &runnerRef{llama: llm2, gpus: gpuIDs, numParallel: 1} r2 := &runnerRef{llama: llm2, gpus: gpuIDs, numParallel: 1}
s := InitScheduler(ctx) s := InitScheduler(ctx)
s.waitForRecovery = 10 * time.Millisecond
s.loadedMu.Lock() s.loadedMu.Lock()
s.loaded["a"] = r1 s.loaded["a"] = r1
s.loaded["b"] = r2 s.loaded["b"] = r2
@ -607,7 +615,7 @@ func TestUpdateFreeSpace(t *testing.T) {
require.Equal(t, uint64(2000-50-75), gpus[1].FreeMemory) require.Equal(t, uint64(2000-50-75), gpus[1].FreeMemory)
} }
func TestFindRunnerToUnload(t *testing.T) { func TestSchedFindRunnerToUnload(t *testing.T) {
ctx, done := context.WithTimeout(t.Context(), 100*time.Millisecond) ctx, done := context.WithTimeout(t.Context(), 100*time.Millisecond)
defer done() defer done()
@ -615,6 +623,7 @@ func TestFindRunnerToUnload(t *testing.T) {
r2 := &runnerRef{sessionDuration: 2, numParallel: 1} r2 := &runnerRef{sessionDuration: 2, numParallel: 1}
s := InitScheduler(ctx) s := InitScheduler(ctx)
s.waitForRecovery = 10 * time.Millisecond
s.loadedMu.Lock() s.loadedMu.Lock()
s.loaded["a"] = r1 s.loaded["a"] = r1
s.loaded["b"] = r2 s.loaded["b"] = r2
@ -627,7 +636,7 @@ func TestFindRunnerToUnload(t *testing.T) {
require.Equal(t, r1, resp) require.Equal(t, r1, resp)
} }
func TestNeedsReload(t *testing.T) { func TestSchedNeedsReload(t *testing.T) {
ctx, done := context.WithTimeout(t.Context(), 100*time.Millisecond) ctx, done := context.WithTimeout(t.Context(), 100*time.Millisecond)
defer done() defer done()
@ -674,13 +683,14 @@ func TestNeedsReload(t *testing.T) {
require.False(t, resp) require.False(t, resp)
} }
func TestUnloadAllRunners(t *testing.T) { func TestSchedUnloadAllRunners(t *testing.T) {
ctx, done := context.WithTimeout(t.Context(), 100*time.Millisecond) ctx, done := context.WithTimeout(t.Context(), 100*time.Millisecond)
defer done() defer done()
llm1 := &mockLlm{vramByGPU: map[ml.DeviceID]uint64{}} llm1 := &mockLlm{vramByGPU: map[ml.DeviceID]uint64{}}
llm2 := &mockLlm{vramByGPU: map[ml.DeviceID]uint64{}} llm2 := &mockLlm{vramByGPU: map[ml.DeviceID]uint64{}}
s := InitScheduler(ctx) s := InitScheduler(ctx)
s.waitForRecovery = 10 * time.Millisecond
s.unloadAllRunners() s.unloadAllRunners()
r1 := &runnerRef{llama: llm1, numParallel: 1} r1 := &runnerRef{llama: llm1, numParallel: 1}
@ -696,7 +706,7 @@ func TestUnloadAllRunners(t *testing.T) {
require.True(t, llm2.closeCalled) require.True(t, llm2.closeCalled)
} }
func TestUnload(t *testing.T) { func TestSchedUnload(t *testing.T) {
llm1 := &mockLlm{vramByGPU: map[ml.DeviceID]uint64{}} llm1 := &mockLlm{vramByGPU: map[ml.DeviceID]uint64{}}
r1 := &runnerRef{llama: llm1, numParallel: 1} r1 := &runnerRef{llama: llm1, numParallel: 1}
r2 := &runnerRef{model: &Model{AdapterPaths: []string{"A"}}, numParallel: 1} r2 := &runnerRef{model: &Model{AdapterPaths: []string{"A"}}, numParallel: 1}
@ -706,13 +716,14 @@ func TestUnload(t *testing.T) {
require.Nil(t, r2.model) require.Nil(t, r2.model)
} }
func TestAlreadyCanceled(t *testing.T) { func TestSchedAlreadyCanceled(t *testing.T) {
ctx, done := context.WithTimeout(t.Context(), 500*time.Millisecond) ctx, done := context.WithTimeout(t.Context(), 500*time.Millisecond)
defer done() defer done()
dctx, done2 := context.WithCancel(ctx) dctx, done2 := context.WithCancel(ctx)
done2() done2()
scenario1a := newScenarioRequest(t, dctx, "ollama-model-1", 10, &api.Duration{Duration: 0}, nil) scenario1a := newScenarioRequest(t, dctx, "ollama-model-1", 10, &api.Duration{Duration: 0}, nil)
s := InitScheduler(ctx) s := InitScheduler(ctx)
s.waitForRecovery = 10 * time.Millisecond
slog.Info("scenario1a") slog.Info("scenario1a")
s.pendingReqCh <- scenario1a.req s.pendingReqCh <- scenario1a.req
require.Len(t, s.pendingReqCh, 1) require.Len(t, s.pendingReqCh, 1)