From 1a2feb2a970c8331e1d34f68877190c169999c44 Mon Sep 17 00:00:00 2001 From: Michael Yang Date: Fri, 10 Oct 2025 16:38:12 -0700 Subject: [PATCH] ollamarunner: fix deadlock hardErrCh will deadlock since forwardBatch is blocked on computeStartedCh which never gets sent. since the response to hardErrCh is to panic, just panic instead --- runner/ollamarunner/runner.go | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/runner/ollamarunner/runner.go b/runner/ollamarunner/runner.go index 22ec7685..28d9d2c9 100644 --- a/runner/ollamarunner/runner.go +++ b/runner/ollamarunner/runner.go @@ -321,9 +321,6 @@ type Server struct { // TODO (jmorganca): make this n_batch batchSize int - // Used to signal a hard failure during async processing which will panic the runner - hardErrCh chan error - // Simple counter used only for trace logging batches batchID int @@ -411,8 +408,6 @@ func (s *Server) run(ctx context.Context) { select { case <-ctx.Done(): return - case err := <-s.hardErrCh: - panic(err) default: var err error nextBatch, err := s.forwardBatch(previousBatch) @@ -663,9 +658,7 @@ func (s *Server) computeBatch(activeBatch batchState) { // don't sample prompt processing if len(seq.inputs) != 0 { if !s.cache.enabled { - s.hardErrCh <- fmt.Errorf("caching disabled but unable to fit entire input in a batch") - s.mu.Unlock() - return + panic("caching disabled but unable to fit entire input in a batch") } continue } @@ -720,8 +713,7 @@ func (s *Server) computeBatch(activeBatch batchState) { logutil.Trace("computeBatch: vocab details", "batchID", activeBatch.id, "seqIdx", i, "len(logits)", len(outputs), "len(activeBatch.batch.Outputs)", activeBatch.batch.Outputs.Dim(0), "vocabSize", vocabSize, "iBatches", iBatches) token, err := seq.sampler.Sample(outputs[iBatches[i]*vocabSize : (iBatches[i]+1)*vocabSize]) if err != nil { - s.hardErrCh <- fmt.Errorf("failed to sample token: %w", err) - return + panic("failed to sample token") } nextBatchTokens[i].Token = token @@ -738,8 +730,7 @@ func (s *Server) computeBatch(activeBatch batchState) { piece, err := s.model.(model.TextProcessor).Decode([]int32{token}) if err != nil { - s.hardErrCh <- fmt.Errorf("failed to decode token: %w", err) - return + panic("failed to decode token") } seq.pendingResponses = append(seq.pendingResponses, piece) @@ -1321,7 +1312,6 @@ func Execute(args []string) error { server := &Server{ modelPath: *mpath, status: llm.ServerStatusLaunched, - hardErrCh: make(chan error, 1), } server.cond = sync.NewCond(&server.mu)