Files
ollama-for-amd/server/internal/internal/syncs/line.go
Blake Mizerany 348b3e0983 server/internal: copy bmizerany/ollama-go to internal package (#9294)
This commit copies (without history) the bmizerany/ollama-go repository
with the intention of integrating it into the ollama as a replacement
for the pushing, and pulling of models, and management of the cache they
are pushed and pulled from.

New homes for these packages will be determined as they are integrated
and we have a better understanding of proper package boundaries.
2025-02-24 22:39:44 -08:00

202 lines
4.6 KiB
Go

// Package syncs provides synchronization primitives.
package syncs
import (
"cmp"
"io"
"sync"
)
var closedChan = func() chan struct{} {
ch := make(chan struct{})
close(ch)
return ch
}()
// Ticket represents a ticket in a sequence of tickets. The zero value is
// invalid. Use [Line.Take] to get a valid ticket.
//
// A Ticket is not safe for concurrent use.
type Ticket struct {
ahead chan struct{} // ticket ahead of this one
ch chan struct{}
}
// Ready returns a channel that is closed when the ticket before this one is
// done.
//
// It is incorrect to wait on Ready after the ticket is done.
func (t *Ticket) Ready() chan struct{} {
return cmp.Or(t.ahead, closedChan)
}
// Done signals that this ticket is done and that the next ticket in line can
// proceed.
//
// The first call to [Done] unblocks the ticket after it, if any. Subsequent
// calls are no-ops.
func (t *Ticket) Done() {
if t.ch != nil {
close(t.ch)
}
t.ch = nil
}
// Line is an ordered sequence of tickets waiting for their turn to proceed.
//
// To get a ticket use [Line.Take].
// To signal that a ticket is done use [Ticket.Done].
// To wait your turn use [Ticket.Ready].
//
// A Line is not safe for concurrent use.
type Line struct {
last chan struct{} // last ticket in line
}
func (q *Line) Take() *Ticket {
t := &Ticket{
ahead: q.last,
ch: make(chan struct{}),
}
q.last = t.ch
return t
}
// RelayReader implements an [io.WriterTo] that yields the passed
// writer to its [WriteTo] method each [io.WriteCloser] taken from [Take], in
// the order they are taken. Each [io.WriteCloser] blocks until the previous
// one is closed, or a call to [RelayReader.CloseWithError] is made.
//
// The zero value is invalid. Use [NewWriteToLine] to get a valid RelayReader.
//
// It is not safe for concurrent use.
type RelayReader struct {
line Line
t *Ticket
w io.Writer
n int64
mu sync.Mutex
err error // set by CloseWithError
closedCh chan struct{} // closed if err is set
}
var (
_ io.Closer = (*RelayReader)(nil)
_ io.WriterTo = (*RelayReader)(nil)
_ io.Reader = (*RelayReader)(nil)
)
func NewRelayReader() *RelayReader {
var q RelayReader
q.closedCh = make(chan struct{})
q.t = q.line.Take()
return &q
}
// CloseWithError terminates the line, unblocking any writer waiting for its
// turn with the error, or [io.EOF] if err is nil. It is safe to call
// [CloseWithError] multiple times and across multiple goroutines.
//
// If the line is already closed, [CloseWithError] is a no-op.
//
// It never returns an error.
func (q *RelayReader) CloseWithError(err error) error {
q.mu.Lock()
defer q.mu.Unlock()
if q.err == nil {
q.err = cmp.Or(q.err, err, io.EOF)
close(q.closedCh)
}
return nil
}
// Close closes the line. Any writer waiting for its turn will be unblocked
// with an [io.ErrClosedPipe] error.
//
// It never returns an error.
func (q *RelayReader) Close() error {
return q.CloseWithError(nil)
}
func (q *RelayReader) closed() <-chan struct{} {
q.mu.Lock()
defer q.mu.Unlock()
return q.closedCh
}
func (q *RelayReader) Read(p []byte) (int, error) {
panic("RelayReader.Read is for show only; use WriteTo")
}
// WriteTo yields the writer w to the first writer in line and blocks until the
// first call to [Close].
//
// It is safe to call [Take] concurrently with [WriteTo].
func (q *RelayReader) WriteTo(dst io.Writer) (int64, error) {
select {
case <-q.closed():
return 0, io.ErrClosedPipe
default:
}
// We have a destination writer; let the relay begin.
q.w = dst
q.t.Done()
<-q.closed()
return q.n, nil
}
// Take returns a writer that will be passed to the next writer in line.
//
// It is not safe for use across multiple goroutines.
func (q *RelayReader) Take() io.WriteCloser {
return &relayWriter{q: q, t: q.line.Take()}
}
type relayWriter struct {
q *RelayReader
t *Ticket
ready bool
}
var _ io.StringWriter = (*relayWriter)(nil)
// Write writes to the writer passed to [RelayReader.WriteTo] as soon as the
// writer is ready. It returns io.ErrClosedPipe if the line is closed before
// the writer is ready.
func (w *relayWriter) Write(p []byte) (int, error) {
if !w.awaitTurn() {
return 0, w.q.err
}
n, err := w.q.w.Write(p)
w.q.n += int64(n)
return n, err
}
func (w *relayWriter) WriteString(s string) (int, error) {
if !w.awaitTurn() {
return 0, w.q.err
}
return io.WriteString(w.q.w, s)
}
// Close signals that the writer is done, unblocking the next writer in line.
func (w *relayWriter) Close() error {
w.t.Done()
return nil
}
func (t *relayWriter) awaitTurn() (ok bool) {
if t.ready {
return true
}
select {
case <-t.t.Ready():
t.ready = true
return true
case <-t.q.closed():
return false
}
}