Merge branch 'ollama:main' into main

This commit is contained in:
likelovewant
2024-05-16 22:24:44 +08:00
committed by GitHub
18 changed files with 285 additions and 98 deletions

View File

@@ -377,7 +377,7 @@ See the [API documentation](./docs/api.md) for all endpoints.
- [Testcontainers](https://testcontainers.com/modules/ollama/)
- [Portkey](https://portkey.ai/docs/welcome/integration-guides/ollama)
- [PromptingTools.jl](https://github.com/svilupp/PromptingTools.jl) with an [example](https://svilupp.github.io/PromptingTools.jl/dev/examples/working_with_ollama)
- [LlamaScript](https://github.com/WolfTheDeveloper/llamascript)
- [LlamaScript](https://github.com/Project-Llama/llamascript)
### Mobile
- [Enchanted](https://github.com/AugustDev/enchanted)

View File

@@ -354,6 +354,15 @@ func (c *Client) List(ctx context.Context) (*ListResponse, error) {
return &lr, nil
}
// List running models.
func (c *Client) ListRunning(ctx context.Context) (*ListResponse, error) {
var lr ListResponse
if err := c.do(ctx, http.MethodGet, "/api/ps", nil, &lr); err != nil {
return nil, err
}
return &lr, nil
}
// Copy copies a model - creating a model with another name from an existing
// model.
func (c *Client) Copy(ctx context.Context, req *CopyRequest) error {

View File

@@ -289,10 +289,12 @@ type ListResponse struct {
type ModelResponse struct {
Name string `json:"name"`
Model string `json:"model"`
ModifiedAt time.Time `json:"modified_at"`
ModifiedAt time.Time `json:"modified_at,omitempty"`
Size int64 `json:"size"`
Digest string `json:"digest"`
Details ModelDetails `json:"details,omitempty"`
ExpiresAt time.Time `json:"expires_at,omitempty"`
SizeVRAM int64 `json:"size_vram,omitempty"`
}
type TokenResponse struct {

View File

@@ -12,6 +12,7 @@ import (
"fmt"
"io"
"log"
"math"
"net"
"net/http"
"os"
@@ -324,6 +325,18 @@ func RunHandler(cmd *cobra.Command, args []string) error {
}
opts.Format = format
keepAlive, err := cmd.Flags().GetString("keepalive")
if err != nil {
return err
}
if keepAlive != "" {
d, err := time.ParseDuration(keepAlive)
if err != nil {
return err
}
opts.KeepAlive = &api.Duration{Duration: d}
}
prompts := args[1:]
// prepend stdin to the prompt if provided
if !term.IsTerminal(int(os.Stdin.Fd())) {
@@ -496,6 +509,52 @@ func ListHandler(cmd *cobra.Command, args []string) error {
return nil
}
func ListRunningHandler(cmd *cobra.Command, args []string) error {
client, err := api.ClientFromEnvironment()
if err != nil {
return err
}
models, err := client.ListRunning(cmd.Context())
if err != nil {
return err
}
var data [][]string
for _, m := range models.Models {
if len(args) == 0 || strings.HasPrefix(m.Name, args[0]) {
var procStr string
switch {
case m.SizeVRAM == 0:
procStr = "100% CPU"
case m.SizeVRAM == m.Size:
procStr = "100% GPU"
case m.SizeVRAM > m.Size || m.Size == 0:
procStr = "Unknown"
default:
sizeCPU := m.Size - m.SizeVRAM
cpuPercent := math.Round(float64(sizeCPU) / float64(m.Size) * 100)
procStr = fmt.Sprintf("%d%%/%d%% CPU/GPU", int(cpuPercent), int(100-cpuPercent))
}
data = append(data, []string{m.Name, m.Digest[:12], format.HumanBytes(m.Size), procStr, format.HumanTime(m.ExpiresAt, "Never")})
}
}
table := tablewriter.NewWriter(os.Stdout)
table.SetHeader([]string{"NAME", "ID", "SIZE", "PROCESSOR", "UNTIL"})
table.SetHeaderAlignment(tablewriter.ALIGN_LEFT)
table.SetAlignment(tablewriter.ALIGN_LEFT)
table.SetHeaderLine(false)
table.SetBorder(false)
table.SetNoWhiteSpace(true)
table.SetTablePadding("\t")
table.AppendBulk(data)
table.Render()
return nil
}
func DeleteHandler(cmd *cobra.Command, args []string) error {
client, err := api.ClientFromEnvironment()
if err != nil {
@@ -672,6 +731,7 @@ type runOptions struct {
Images []api.ImageData
Options map[string]interface{}
MultiModal bool
KeepAlive *api.Duration
}
type displayResponseState struct {
@@ -766,6 +826,10 @@ func chat(cmd *cobra.Command, opts runOptions) (*api.Message, error) {
Options: opts.Options,
}
if opts.KeepAlive != nil {
req.KeepAlive = opts.KeepAlive
}
if err := client.Chat(cancelCtx, req, fn); err != nil {
if errors.Is(err, context.Canceled) {
return nil, nil
@@ -841,14 +905,15 @@ func generate(cmd *cobra.Command, opts runOptions) error {
}
request := api.GenerateRequest{
Model: opts.Model,
Prompt: opts.Prompt,
Context: generateContext,
Images: opts.Images,
Format: opts.Format,
System: opts.System,
Template: opts.Template,
Options: opts.Options,
Model: opts.Model,
Prompt: opts.Prompt,
Context: generateContext,
Images: opts.Images,
Format: opts.Format,
System: opts.System,
Template: opts.Template,
Options: opts.Options,
KeepAlive: opts.KeepAlive,
}
if err := client.Generate(ctx, &request, fn); err != nil {
@@ -1075,6 +1140,7 @@ func NewCLI() *cobra.Command {
RunE: RunHandler,
}
runCmd.Flags().String("keepalive", "", "Duration to keep a model loaded (e.g. 5m)")
runCmd.Flags().Bool("verbose", false, "Show timings for response")
runCmd.Flags().Bool("insecure", false, "Use an insecure registry")
runCmd.Flags().Bool("nowordwrap", false, "Don't wrap words to the next line automatically")
@@ -1090,9 +1156,9 @@ func NewCLI() *cobra.Command {
Environment Variables:
OLLAMA_HOST The host:port to bind to (default "127.0.0.1:11434")
OLLAMA_ORIGINS A comma separated list of allowed origins.
OLLAMA_MODELS The path to the models directory (default is "~/.ollama/models")
OLLAMA_KEEP_ALIVE The duration that models stay loaded in memory (default is "5m")
OLLAMA_ORIGINS A comma separated list of allowed origins
OLLAMA_MODELS The path to the models directory (default "~/.ollama/models")
OLLAMA_KEEP_ALIVE The duration that models stay loaded in memory (default "5m")
OLLAMA_DEBUG Set to 1 to enable additional debug logging
`)
@@ -1123,6 +1189,14 @@ Environment Variables:
PreRunE: checkServerHeartbeat,
RunE: ListHandler,
}
psCmd := &cobra.Command{
Use: "ps",
Short: "List running models",
PreRunE: checkServerHeartbeat,
RunE: ListRunningHandler,
}
copyCmd := &cobra.Command{
Use: "cp SOURCE DESTINATION",
Short: "Copy a model",
@@ -1146,6 +1220,7 @@ Environment Variables:
pullCmd,
pushCmd,
listCmd,
psCmd,
copyCmd,
deleteCmd,
} {
@@ -1160,6 +1235,7 @@ Environment Variables:
pullCmd,
pushCmd,
listCmd,
psCmd,
copyCmd,
deleteCmd,
)

View File

@@ -17,6 +17,7 @@ import (
"github.com/ollama/ollama/api"
"github.com/ollama/ollama/progress"
"github.com/ollama/ollama/readline"
"github.com/ollama/ollama/types/errtypes"
)
type MultilineState int
@@ -56,6 +57,11 @@ func loadModel(cmd *cobra.Command, opts *runOptions) error {
Model: opts.Model,
Messages: []api.Message{},
}
if opts.KeepAlive != nil {
chatReq.KeepAlive = opts.KeepAlive
}
err = client.Chat(cmd.Context(), chatReq, func(resp api.ChatResponse) error {
p.StopAndClear()
if len(opts.Messages) > 0 {
@@ -276,13 +282,20 @@ func generateInteractive(cmd *cobra.Command, opts runOptions) error {
fn := func(resp api.ProgressResponse) error { return nil }
err = client.Create(cmd.Context(), req, fn)
if err != nil {
fmt.Println("error: couldn't save model")
if strings.Contains(err.Error(), errtypes.InvalidModelNameErrMsg) {
fmt.Printf("error: The model name '%s' is invalid\n", args[1])
continue
}
return err
}
fmt.Printf("Created new model '%s'\n", args[1])
continue
case strings.HasPrefix(line, "/clear"):
opts.Messages = []api.Message{}
if opts.System != "" {
newMessage := api.Message{Role: "system", Content: opts.System}
opts.Messages = append(opts.Messages, newMessage)
}
fmt.Println("Cleared session context")
continue
case strings.HasPrefix(line, "/set"):

View File

@@ -797,9 +797,9 @@ curl http://localhost:11434/api/show -d '{
```json
{
"modelfile": "# Modelfile generated by \"ollama show\"\n# To build a new Modelfile based on this one, replace the FROM line with:\n# FROM llava:latest\n\nFROM /Users/matt/.ollama/models/blobs/sha256:200765e1283640ffbd013184bf496e261032fa75b99498a9613be4e94d63ad52\nTEMPLATE \"\"\"{{ .System }}\nUSER: {{ .Prompt }}\nASSSISTANT: \"\"\"\nPARAMETER num_ctx 4096\nPARAMETER stop \"\u003c/s\u003e\"\nPARAMETER stop \"USER:\"\nPARAMETER stop \"ASSSISTANT:\"",
"parameters": "num_ctx 4096\nstop \u003c/s\u003e\nstop USER:\nstop ASSSISTANT:",
"template": "{{ .System }}\nUSER: {{ .Prompt }}\nASSSISTANT: ",
"modelfile": "# Modelfile generated by \"ollama show\"\n# To build a new Modelfile based on this one, replace the FROM line with:\n# FROM llava:latest\n\nFROM /Users/matt/.ollama/models/blobs/sha256:200765e1283640ffbd013184bf496e261032fa75b99498a9613be4e94d63ad52\nTEMPLATE \"\"\"{{ .System }}\nUSER: {{ .Prompt }}\nASSISTANT: \"\"\"\nPARAMETER num_ctx 4096\nPARAMETER stop \"\u003c/s\u003e\"\nPARAMETER stop \"USER:\"\nPARAMETER stop \"ASSISTANT:\"",
"parameters": "num_ctx 4096\nstop \u003c/s\u003e\nstop USER:\nstop ASSISTANT:",
"template": "{{ .System }}\nUSER: {{ .Prompt }}\nASSISTANT: ",
"details": {
"format": "gguf",
"family": "llama",

View File

@@ -80,17 +80,19 @@ If Ollama is run as a systemd service, environment variables should be set using
### Setting environment variables on Windows
On windows, Ollama inherits your user and system environment variables.
On Windows, Ollama inherits your user and system environment variables.
1. First Quit Ollama by clicking on it in the task bar
1. First Quit Ollama by clicking on it in the task bar.
2. Edit system environment variables from the control panel
2. Start the Settings (Windows 11) or Control Panel (Windows 10) application and search for _environment variables_.
3. Edit or create New variable(s) for your user account for `OLLAMA_HOST`, `OLLAMA_MODELS`, etc.
3. Click on _Edit environment variables for your account_.
4. Click OK/Apply to save
4. Edit or create a new variable for your user account for `OLLAMA_HOST`, `OLLAMA_MODELS`, etc.
5. Run `ollama` from a new terminal window
5. Click OK/Apply to save.
6. Start the Ollama application from the Windows Start menu.
## How can I expose Ollama on my network?
@@ -237,4 +239,4 @@ If you wish to override the `OLLAMA_KEEP_ALIVE` setting, use the `keep_alive` AP
If too many requests are sent to the server, it will respond with a 503 error
indicating the server is overloaded. You can adjust how many requests may be
queue by setting `OLLAMA_MAX_QUEUE`
queue by setting `OLLAMA_MAX_QUEUE`

View File

@@ -60,7 +60,9 @@ func humanTime(t time.Time, zeroValue string) string {
}
delta := time.Since(t)
if delta < 0 {
if int(delta.Hours())/24/365 < -20 {
return "Forever"
} else if delta < 0 {
return humanDuration(-delta) + " from now"
}

View File

@@ -32,4 +32,14 @@ func TestHumanTime(t *testing.T) {
v := now.Add(800 * time.Millisecond)
assertEqual(t, HumanTime(v, ""), "Less than a second from now")
})
t.Run("time way in the future", func(t *testing.T) {
v := now.Add(24 * time.Hour * 365 * 200)
assertEqual(t, HumanTime(v, ""), "Forever")
})
t.Run("time way in the future lowercase", func(t *testing.T) {
v := now.Add(24 * time.Hour * 365 * 200)
assertEqual(t, HumanTimeLower(v, ""), "forever")
})
}

View File

@@ -165,7 +165,7 @@ if [ -z "${CUDART_LIB_DIR}" ]; then
CUDART_LIB_DIR="${CUDA_LIB_DIR}"
fi
if [ -d "${CUDA_LIB_DIR}" ]; then
if [ -z "${OLLAMA_SKIP_CUDA_GENERATE}" -a -d "${CUDA_LIB_DIR}" ]; then
echo "CUDA libraries detected - building dynamic CUDA library"
init_vars
CUDA_MAJOR=$(ls "${CUDA_LIB_DIR}"/libcudart.so.* | head -1 | cut -f3 -d. || true)
@@ -227,7 +227,7 @@ if [ -z "${CLBlast_DIR}" ]; then
fi
fi
if [ -d "${ROCM_PATH}" ]; then
if [ -z "${OLLAMA_SKIP_ROCM_GENERATE}" -a -d "${ROCM_PATH}" ]; then
echo "ROCm libraries detected - building dynamic ROCm library"
if [ -f ${ROCM_PATH}/lib/librocblas.so.*.*.????? ]; then
ROCM_VARIANT=_v$(ls ${ROCM_PATH}/lib/librocblas.so.*.*.????? | cut -f5 -d. || true)

View File

@@ -53,6 +53,12 @@ func EstimateGPULayers(gpus []gpu.GpuInfo, ggml *GGML, projectors []string, opts
opts.NumCtx = max(opts.NumCtx, 2048)
}
layers := ggml.Tensors().Layers()
// add one layer worth of memory as a buffer
if blk0, ok := layers["blk.0"]; ok {
memoryMinimum += blk0.size()
}
// fp16 k,v = (1 (k) + 1 (v)) * sizeof(float16) * n_ctx * n_layer * n_embd / n_head * n_head_kv
var kv uint64 = 2 * 2 * uint64(opts.NumCtx) * ggml.KV().BlockCount() * ggml.KV().EmbeddingLength() / ggml.KV().HeadCount() * ggml.KV().HeadCountKV()
@@ -73,13 +79,11 @@ func EstimateGPULayers(gpus []gpu.GpuInfo, ggml *GGML, projectors []string, opts
graphPartialOffload = graphFullOffload
}
layers := ggml.Tensors().Layers()
// memoryRequiredTotal represents the memory required for full GPU offloading (all layers)
memoryRequiredTotal := memoryMinimum + graphFullOffload + layers["blk.0"].size()
memoryRequiredTotal := memoryMinimum + graphFullOffload
// memoryRequiredPartial represents the memory required for partial GPU offloading (n > 0, n < layers)
memoryRequiredPartial := memoryMinimum + graphPartialOffload + layers["blk.0"].size()
memoryRequiredPartial := memoryMinimum + graphPartialOffload
var memoryLayerOutput uint64
if layer, ok := layers["output_norm"]; ok {
@@ -100,15 +104,17 @@ func EstimateGPULayers(gpus []gpu.GpuInfo, ggml *GGML, projectors []string, opts
var layerCount int
for i := 0; i < int(ggml.KV().BlockCount()); i++ {
memoryLayer := layers[fmt.Sprintf("blk.%d", i)].size()
if blk, ok := layers[fmt.Sprintf("blk.%d", i)]; ok {
memoryLayer := blk.size()
// KV is proportional to the number of layers
memoryLayer += kv / ggml.KV().BlockCount()
// KV is proportional to the number of layers
memoryLayer += kv / ggml.KV().BlockCount()
memoryRequiredTotal += memoryLayer
if memoryAvailable > memoryRequiredPartial+memoryLayer {
memoryRequiredPartial += memoryLayer
layerCount++
memoryRequiredTotal += memoryLayer
if (opts.NumGPU >= 0 && layerCount+1 <= opts.NumGPU) || (opts.NumGPU < 0 && memoryAvailable > memoryRequiredPartial+memoryLayer) {
memoryRequiredPartial += memoryLayer
layerCount++
}
}
}
@@ -117,7 +123,7 @@ func EstimateGPULayers(gpus []gpu.GpuInfo, ggml *GGML, projectors []string, opts
memoryRequiredTotal += memoryLayerOutput
}
if memoryAvailable > memoryRequiredTotal {
if (opts.NumGPU >= 0 && layerCount+1 <= opts.NumGPU) || (opts.NumGPU < 0 && memoryAvailable > memoryRequiredTotal) {
layerCount = int(ggml.KV().BlockCount()) + 1
memoryRequiredPartial = memoryRequiredTotal
}
@@ -128,10 +134,10 @@ func EstimateGPULayers(gpus []gpu.GpuInfo, ggml *GGML, projectors []string, opts
"offload to gpu",
slog.Group(
"layers",
// actual number of layers offloaded
"real", opts.NumGPU,
// requested number of layers to offload
"requested", opts.NumGPU,
// estimated number of layers that can be offloaded
"estimate", layerCount,
"real", layerCount,
),
slog.Group(
"memory",

View File

@@ -38,6 +38,7 @@ type LlamaServer interface {
Detokenize(ctx context.Context, tokens []int) (string, error)
Close() error
EstimatedVRAM() uint64
EstimatedTotal() uint64
}
// llmServer is an instance of the llama.cpp server
@@ -88,6 +89,7 @@ func NewLlamaServer(gpus gpu.GpuInfoList, model string, ggml *GGML, adapters, pr
cpuRunner = serverForCpu()
gpuCount = 0
_, _, estimatedTotal = EstimateGPULayers(gpus, ggml, projectors, opts)
} else {
if gpus[0].Library == "metal" {
memInfo, err := gpu.GetCPUMem()
@@ -316,8 +318,22 @@ func NewLlamaServer(gpus gpu.GpuInfoList, model string, ggml *GGML, adapters, pr
}
slog.Info("starting llama server", "cmd", s.cmd.String())
// Log at debug as the environment is inherited and might contain sensitive information
slog.Debug("subprocess", "environment", s.cmd.Env)
if envconfig.Debug {
filteredEnv := []string{}
for _, ev := range s.cmd.Env {
if strings.HasPrefix(ev, "CUDA_") ||
strings.HasPrefix(ev, "ROCM_") ||
strings.HasPrefix(ev, "HIP_") ||
strings.HasPrefix(ev, "HSA_") ||
strings.HasPrefix(ev, "GGML_") ||
strings.HasPrefix(ev, "PATH=") ||
strings.HasPrefix(ev, "LD_LIBRARY_PATH=") {
filteredEnv = append(filteredEnv, ev)
}
}
// Log at debug as the environment is inherited and might contain sensitive information
slog.Debug("subprocess", "environment", filteredEnv)
}
if err = s.cmd.Start(); err != nil {
// Detect permission denied and augment them essage about noexec
@@ -955,6 +971,10 @@ func (s *llmServer) EstimatedVRAM() uint64 {
return s.estimatedVRAM
}
func (s *llmServer) EstimatedTotal() uint64 {
return s.estimatedTotal
}
func parseDurationMs(ms float64) time.Duration {
dur, err := time.ParseDuration(fmt.Sprintf("%fms", ms))
if err != nil {

View File

@@ -221,7 +221,7 @@ func (b *blobDownload) downloadChunk(ctx context.Context, requestURL *url.URL, w
}
defer resp.Body.Close()
n, err := io.Copy(w, io.TeeReader(resp.Body, part))
n, err := io.CopyN(w, io.TeeReader(resp.Body, part), part.Size)
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, io.ErrUnexpectedEOF) {
// rollback progress
b.Completed.Add(-n)

View File

@@ -30,6 +30,7 @@ import (
"github.com/ollama/ollama/llm"
"github.com/ollama/ollama/openai"
"github.com/ollama/ollama/server/envconfig"
"github.com/ollama/ollama/types/errtypes"
"github.com/ollama/ollama/types/model"
"github.com/ollama/ollama/version"
)
@@ -517,7 +518,7 @@ func (s *Server) CreateModelHandler(c *gin.Context) {
name := model.ParseName(cmp.Or(req.Model, req.Name))
if !name.IsValid() {
c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{"error": "invalid model name"})
c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{"error": errtypes.InvalidModelNameErrMsg})
return
}
@@ -708,7 +709,7 @@ func GetModelInfo(req api.ShowRequest) (*api.ShowResponse, error) {
}
var sb strings.Builder
fmt.Fprintln(&sb, "# Modelfile generate by \"ollama show\"")
fmt.Fprintln(&sb, "# Modelfile generated by \"ollama show\"")
fmt.Fprintln(&sb, "# To build a new Modelfile based on this, replace FROM with:")
fmt.Fprintf(&sb, "# FROM %s\n\n", model.ShortName)
fmt.Fprint(&sb, model.String())
@@ -724,7 +725,7 @@ func (s *Server) ListModelsHandler(c *gin.Context) {
return
}
var models []api.ModelResponse
models := []api.ModelResponse{}
if err := filepath.Walk(manifests, func(path string, info os.FileInfo, _ error) error {
if !info.IsDir() {
rel, err := filepath.Rel(manifests, path)
@@ -979,6 +980,7 @@ func (s *Server) GenerateRoutes() http.Handler {
r.POST("/api/show", s.ShowModelHandler)
r.POST("/api/blobs/:digest", s.CreateBlobHandler)
r.HEAD("/api/blobs/:digest", s.HeadBlobHandler)
r.GET("/api/ps", s.ProcessHandler)
// Compatibility endpoints
r.POST("/v1/chat/completions", openai.Middleware(), s.ChatHandler)
@@ -1137,6 +1139,42 @@ func streamResponse(c *gin.Context, ch chan any) {
})
}
func (s *Server) ProcessHandler(c *gin.Context) {
models := []api.ModelResponse{}
for _, v := range s.sched.loaded {
model := v.model
modelDetails := api.ModelDetails{
Format: model.Config.ModelFormat,
Family: model.Config.ModelFamily,
Families: model.Config.ModelFamilies,
ParameterSize: model.Config.ModelType,
QuantizationLevel: model.Config.FileType,
}
mr := api.ModelResponse{
Model: model.ShortName,
Name: model.ShortName,
Size: int64(v.estimatedTotal),
SizeVRAM: int64(v.estimatedVRAM),
Digest: model.Digest,
Details: modelDetails,
ExpiresAt: v.expiresAt,
}
// The scheduler waits to set expiresAt, so if a model is loading it's
// possible that it will be set to the unix epoch. For those cases, just
// calculate the time w/ the sessionDuration instead.
var epoch time.Time
if v.expiresAt == epoch {
mr.ExpiresAt = time.Now().Add(v.sessionDuration)
}
models = append(models, mr)
}
c.JSON(http.StatusOK, api.ListResponse{Models: models})
}
// ChatPrompt builds up a prompt from a series of messages for the currently `loaded` model
func chatPrompt(ctx context.Context, runner *runnerRef, template string, messages []api.Message, numCtx int) (string, error) {
encode := func(s string) ([]int, error) {

View File

@@ -95,6 +95,7 @@ func Test_Routes(t *testing.T) {
err = json.Unmarshal(body, &modelList)
assert.Nil(t, err)
assert.NotNil(t, modelList.Models)
assert.Equal(t, 0, len(modelList.Models))
},
},

View File

@@ -6,6 +6,7 @@ import (
"fmt"
"log/slog"
"reflect"
"runtime"
"sort"
"strings"
"sync"
@@ -177,7 +178,7 @@ func (s *Scheduler) processPending(ctx context.Context) {
}
// Trigger an expiration to unload once it's done
runnerToExpire.refMu.Lock()
slog.Debug("resetting model to expire immediately to make room", "model", runnerToExpire.model, "refCount", runnerToExpire.refCount)
slog.Debug("resetting model to expire immediately to make room", "modelPath", runnerToExpire.modelPath, "refCount", runnerToExpire.refCount)
if runnerToExpire.expireTimer != nil {
runnerToExpire.expireTimer.Stop()
runnerToExpire.expireTimer = nil
@@ -190,13 +191,13 @@ func (s *Scheduler) processPending(ctx context.Context) {
// Wait for the unload to happen
// Note: at this point we're queueing up all incoming requests, even if they were for
// a different model that's loaded and not scheduled to be removed.
slog.Debug("waiting for pending requests to complete and unload to occur", "model", runnerToExpire.model)
slog.Debug("waiting for pending requests to complete and unload to occur", "modelPath", runnerToExpire.modelPath)
select {
case <-ctx.Done():
slog.Debug("shutting down scheduler pending loop")
return
case <-s.unloadedCh:
slog.Debug("unload completed", "model", runnerToExpire.model)
slog.Debug("unload completed", "modelPath", runnerToExpire.modelPath)
continue
}
}
@@ -219,23 +220,23 @@ func (s *Scheduler) processCompleted(ctx context.Context) {
runner := s.loaded[finished.model.ModelPath]
s.loadedMu.Unlock()
if runner == nil {
slog.Error("finished requeset signal received after model unloaded", "model", finished.model.ModelPath)
slog.Error("finished requeset signal received after model unloaded", "modelPath", finished.model.ModelPath)
continue
}
runner.refMu.Lock()
runner.refCount--
if runner.refCount <= 0 {
if runner.sessionDuration <= 0 {
slog.Debug("runner with zero duration has gone idle, expiring to unload", "model", runner.model)
slog.Debug("runner with zero duration has gone idle, expiring to unload", "modelPath", runner.modelPath)
if runner.expireTimer != nil {
runner.expireTimer.Stop()
runner.expireTimer = nil
}
s.expiredCh <- runner
} else if runner.expireTimer == nil {
slog.Debug("runner with non-zero duration has gone idle, adding timer", "model", runner.model, "duration", runner.sessionDuration)
slog.Debug("runner with non-zero duration has gone idle, adding timer", "modelPath", runner.modelPath, "duration", runner.sessionDuration)
runner.expireTimer = time.AfterFunc(runner.sessionDuration, func() {
slog.Debug("timer expired, expiring to unload", "model", runner.model)
slog.Debug("timer expired, expiring to unload", "modelPath", runner.modelPath)
runner.refMu.Lock()
defer runner.refMu.Unlock()
if runner.expireTimer != nil {
@@ -244,19 +245,21 @@ func (s *Scheduler) processCompleted(ctx context.Context) {
}
s.expiredCh <- runner
})
runner.expiresAt = time.Now().Add(runner.sessionDuration)
} else {
slog.Debug("runner with non-zero duration has gone idle, resetting timer", "model", runner.model, "duration", runner.sessionDuration)
slog.Debug("runner with non-zero duration has gone idle, resetting timer", "modelPath", runner.modelPath, "duration", runner.sessionDuration)
runner.expireTimer.Reset(runner.sessionDuration)
runner.expiresAt = time.Now().Add(runner.sessionDuration)
}
}
slog.Debug("after processing request finished event", "model", runner.model, "refCount", runner.refCount)
slog.Debug("after processing request finished event", "modelPath", runner.modelPath, "refCount", runner.refCount)
runner.refMu.Unlock()
case runner := <-s.expiredCh:
slog.Debug("runner expired event received", "model", runner.model)
slog.Debug("runner expired event received", "modelPath", runner.modelPath)
runner.refMu.Lock()
if runner.refCount > 0 {
// Shouldn't happen, but safeguard to ensure no leaked runners
slog.Debug("expired event with positive ref count, retrying", "model", runner.model, "refCount", runner.refCount)
slog.Debug("expired event with positive ref count, retrying", "modelPath", runner.modelPath, "refCount", runner.refCount)
go func(runner *runnerRef) {
// We can't unload yet, but want to as soon as the current request completes
// So queue up another expired event
@@ -268,16 +271,16 @@ func (s *Scheduler) processCompleted(ctx context.Context) {
}
s.loadedMu.Lock()
slog.Debug("got lock to unload", "model", runner.model)
slog.Debug("got lock to unload", "modelPath", runner.modelPath)
finished := runner.waitForVRAMRecovery()
runner.unload()
delete(s.loaded, runner.model)
delete(s.loaded, runner.modelPath)
s.loadedMu.Unlock()
slog.Debug("runner released", "model", runner.model)
slog.Debug("runner released", "modelPath", runner.modelPath)
runner.refMu.Unlock()
<-finished
slog.Debug("sending an unloaded event", "model", runner.model)
slog.Debug("sending an unloaded event", "modelPath", runner.modelPath)
s.unloadedCh <- struct{}{}
}
}
@@ -316,18 +319,20 @@ func (s *Scheduler) load(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList)
req.errCh <- err
return
}
runner := &runnerRef{}
runner.model = req.model.ModelPath
runner.adapters = req.model.AdapterPaths
runner.projectors = req.model.ProjectorPaths
runner.llama = llama
runner.Options = &req.opts
runner.sessionDuration = req.sessionDuration
runner.gpus = gpus
runner.estimatedVRAM = llama.EstimatedVRAM()
runner.loading = true
runner.refCount = 1
runner := &runnerRef{
model: req.model,
modelPath: req.model.ModelPath,
llama: llama,
Options: &req.opts,
sessionDuration: req.sessionDuration,
gpus: gpus,
estimatedVRAM: llama.EstimatedVRAM(),
estimatedTotal: llama.EstimatedTotal(),
loading: true,
refCount: 1,
}
runner.refMu.Lock()
s.loadedMu.Lock()
s.loaded[req.model.ModelPath] = runner
slog.Info("loaded runners", "count", len(s.loaded))
@@ -339,7 +344,7 @@ func (s *Scheduler) load(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList)
slog.Error("error loading llama server", "error", err)
runner.refCount--
req.errCh <- err
slog.Debug("triggering expiration for failed load", "model", runner.model)
slog.Debug("triggering expiration for failed load", "model", runner.modelPath)
s.expiredCh <- runner
return
}
@@ -408,17 +413,18 @@ type runnerRef struct {
refCount uint // prevent unloading if > 0
// unloading bool // set to true when we are trying to unload the runner
llama llm.LlamaServer
loading bool // True only during initial load, then false forever
gpus gpu.GpuInfoList // Recorded at time of provisioning
estimatedVRAM uint64
llama llm.LlamaServer
loading bool // True only during initial load, then false forever
gpus gpu.GpuInfoList // Recorded at time of provisioning
estimatedVRAM uint64
estimatedTotal uint64
sessionDuration time.Duration
expireTimer *time.Timer
expiresAt time.Time
model string
adapters []string
projectors []string
model *Model
modelPath string
*api.Options
}
@@ -431,9 +437,8 @@ func (runner *runnerRef) unload() {
if runner.llama != nil {
runner.llama.Close()
}
runner.model = nil
runner.llama = nil
runner.adapters = nil
runner.projectors = nil
runner.Options = nil
runner.gpus = nil
}
@@ -462,8 +467,8 @@ func (runner *runnerRef) needsReload(ctx context.Context, req *LlmRequest) bool
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
if !reflect.DeepEqual(runner.adapters, req.model.AdapterPaths) || // have the adapters changed?
!reflect.DeepEqual(runner.projectors, req.model.ProjectorPaths) || // have the projectors changed?
if !reflect.DeepEqual(runner.model.AdapterPaths, req.model.AdapterPaths) || // have the adapters changed?
!reflect.DeepEqual(runner.model.ProjectorPaths, req.model.ProjectorPaths) || // have the projectors changed?
!reflect.DeepEqual(optsExisting, optsNew) || // have the runner options changed?
runner.llama.Ping(ctx) != nil {
return true
@@ -483,8 +488,8 @@ func (runner *runnerRef) needsReload(ctx context.Context, req *LlmRequest) bool
func (runner *runnerRef) waitForVRAMRecovery() chan interface{} {
finished := make(chan interface{}, 1)
// CPU or Metal don't need checking, so no waiting required
if len(runner.gpus) == 1 && (runner.gpus[0].Library == "cpu" || runner.gpus[0].Library == "metal") {
// CPU or Metal don't need checking, so no waiting required, windows can page VRAM, and the APIs we query tend to be optimistic on free space
if (len(runner.gpus) == 1 && (runner.gpus[0].Library == "cpu" || runner.gpus[0].Library == "metal")) || runtime.GOOS == "windows" {
finished <- struct{}{}
return finished
}

View File

@@ -164,7 +164,8 @@ func TestRequests(t *testing.T) {
// simple reload of same model
scenario2a := newScenario(t, ctx, "ollama-model-1", 20)
scenario2a.req.model = scenario1a.req.model
tmpModel := *scenario1a.req.model
scenario2a.req.model = &tmpModel
scenario2a.ggml = scenario1a.ggml
// Multiple loaded models
@@ -496,10 +497,9 @@ func TestNeedsReload(t *testing.T) {
llm := &mockLlm{}
do := api.DefaultOptions()
runner := &runnerRef{
adapters: []string{"adapter1"},
projectors: []string{"projector1"},
Options: &do,
llama: llm,
model: &Model{AdapterPaths: []string{"adapter1"}, ProjectorPaths: []string{"projector1"}},
Options: &do,
llama: llm,
}
req := &LlmRequest{
model: &Model{
@@ -510,10 +510,10 @@ func TestNeedsReload(t *testing.T) {
}
resp := runner.needsReload(ctx, req)
require.True(t, resp)
req.model.AdapterPaths = runner.adapters
req.model.AdapterPaths = runner.model.AdapterPaths
resp = runner.needsReload(ctx, req)
require.True(t, resp)
req.model.ProjectorPaths = runner.projectors
req.model.ProjectorPaths = runner.model.ProjectorPaths
runner.loading = true
req.opts.NumBatch = 1234
resp = runner.needsReload(ctx, req)
@@ -558,11 +558,11 @@ func TestUnloadAllRunners(t *testing.T) {
func TestUnload(t *testing.T) {
llm1 := &mockLlm{}
r1 := &runnerRef{llama: llm1}
r2 := &runnerRef{adapters: []string{"A"}}
r2 := &runnerRef{model: &Model{AdapterPaths: []string{"A"}}}
r1.unload()
require.True(t, llm1.closeCalled)
r2.unload()
require.Nil(t, r2.adapters)
require.Nil(t, r2.model)
}
type mockLlm struct {
@@ -578,6 +578,7 @@ type mockLlm struct {
closeResp error
closeCalled bool
estimatedVRAM uint64
estimatedTotal uint64
}
func (s *mockLlm) Ping(ctx context.Context) error { return s.pingResp }
@@ -598,4 +599,5 @@ func (s *mockLlm) Close() error {
s.closeCalled = true
return s.closeResp
}
func (s *mockLlm) EstimatedVRAM() uint64 { return s.estimatedVRAM }
func (s *mockLlm) EstimatedVRAM() uint64 { return s.estimatedVRAM }
func (s *mockLlm) EstimatedTotal() uint64 { return s.estimatedTotal }

View File

@@ -7,6 +7,7 @@ import (
)
const UnknownOllamaKeyErrMsg = "unknown ollama key"
const InvalidModelNameErrMsg = "invalid model name"
// TODO: This should have a structured response from the API
type UnknownOllamaKey struct {