diff --git a/objectstore/client.go b/objectstore/client.go index db6013b..5f7246d 100644 --- a/objectstore/client.go +++ b/objectstore/client.go @@ -15,6 +15,13 @@ import ( "github.com/minio/minio-go/v7/pkg/credentials" ) +// blobUploadPartSize/blobUploadThreads bound PutStreaming memory at +// partSize*threads while keeping concurrency for multi-GiB blobs. +const ( + blobUploadPartSize = uint64(64) << 20 + blobUploadThreads = uint(4) +) + // ErrNotFound is returned when a requested object does not exist. var ErrNotFound = errors.New("not found") @@ -58,6 +65,21 @@ func (c *Client) Put(ctx context.Context, key string, body io.Reader, size int64 return nil } +// PutStreaming uploads body to key via concurrent multipart without buffering +// the whole object, reading from a non-seekable source. +func (c *Client) PutStreaming(ctx context.Context, key string, body io.Reader, size int64) error { + _, err := c.client.PutObject(ctx, c.cfg.Bucket, c.fullKey(key), body, size, minio.PutObjectOptions{ + ContentType: "application/octet-stream", + PartSize: blobUploadPartSize, + NumThreads: blobUploadThreads, + ConcurrentStreamParts: true, + }) + if err != nil { + return fmt.Errorf("put streaming %s: %w", key, err) + } + return nil +} + // PresignGet returns a time-limited URL for a direct GET, bypassing this process. func (c *Client) PresignGet(ctx context.Context, key string, ttl time.Duration) (string, error) { u, err := c.client.PresignedGetObject(ctx, c.cfg.Bucket, c.fullKey(key), ttl, nil) diff --git a/registry/registry.go b/registry/registry.go index 68687ad..c7339c6 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -64,6 +64,12 @@ func (r *Registry) PushBlobFromStream(ctx context.Context, digest string, body i return r.client.Put(ctx, blobKey(digest), body, size) } +// PushBlobStreaming streams a blob straight to the object store without +// buffering it whole. The caller verifies the digest after the stream drains. +func (r *Registry) PushBlobStreaming(ctx context.Context, digest string, body io.Reader, size int64) error { + return r.client.PutStreaming(ctx, blobKey(digest), body, size) +} + // BlobExists reports whether a blob with the given digest exists. func (r *Registry) BlobExists(ctx context.Context, digest string) (bool, error) { return r.client.Exists(ctx, blobKey(digest)) diff --git a/server/registry_v2_uploads.go b/server/registry_v2_uploads.go index 5f6c004..d26ba8d 100644 --- a/server/registry_v2_uploads.go +++ b/server/registry_v2_uploads.go @@ -1,14 +1,26 @@ package server import ( + "context" + "crypto/sha256" + "encoding/hex" "errors" "fmt" "io" "net/http" "strconv" + "time" + + "github.com/projecteru2/core/log" ) -const uploadBodyLimit = defaultUploadMaxBytes +const ( + uploadBodyLimit = defaultUploadMaxBytes + + // discardBlobTimeout bounds the detached corrupt-blob delete; the object + // store layer imposes no request timeout of its own. + discardBlobTimeout = 30 * time.Second +) func (s *Server) v2InitBlobUpload(w http.ResponseWriter, r *http.Request) { name := urlVar(r, "name") @@ -20,6 +32,7 @@ func (s *Server) v2InitBlobUpload(w http.ResponseWriter, r *http.Request) { id, err := s.uploads.Start() if err != nil { + log.WithFunc("server.v2InitBlobUpload").Errorf(r.Context(), err, "start upload session failed") v2Error(w, http.StatusInternalServerError, "INTERNAL_ERROR", err.Error()) return } @@ -78,31 +91,79 @@ func (s *Server) v2CompleteBlobUpload(w http.ResponseWriter, r *http.Request) { s.persistVerifiedBlob(w, r, name, digest, fu) } +// persistMonolithicUpload streams a single-PUT blob straight to the digest key +// while hashing inline (no disk spool), then verifies. A mismatch discards the +// object: epoch's read path does not re-hash, so the digest key must only ever +// hold verified bytes. func (s *Server) persistMonolithicUpload(w http.ResponseWriter, r *http.Request, name, digest string) { - id, err := s.uploads.Start() + dgst := stripSHA256Prefix(digest) + + exists, err := s.reg.BlobExists(r.Context(), dgst) if err != nil { + // fail closed: streaming on could overwrite then delete an existing blob. + log.WithFunc("server.persistMonolithicUpload").Errorf(r.Context(), err, "blob exists check for sha256:%s failed", dgst) v2Error(w, http.StatusInternalServerError, "INTERNAL_ERROR", err.Error()) return } - body := io.LimitReader(r.Body, uploadBodyLimit) - if _, appendErr := s.uploads.Append(id, body); appendErr != nil { - drainBody(body) - s.uploads.Cancel(id) - writeUploadAppendError(w, appendErr) + if exists { + drainBody(r.Body) // keep the connection reusable + s.blobCreated(w, name, digest) return } - fu, err := s.uploads.Finalize(id) - if err != nil { - drainBody(body) - writeUploadAppendError(w, err) + + release, ok := s.acquireStreamSlot(r.Context()) + if !ok { + log.WithFunc("server.persistMonolithicUpload").Debugf(r.Context(), "upload canceled while queued for a streaming slot") return } - defer func() { _ = fu.Close() }() + defer release() - s.persistVerifiedBlob(w, r, name, digest, fu) + hasher := sha256.New() + body := io.TeeReader(io.LimitReader(r.Body, uploadBodyLimit), hasher) + if err := s.reg.PushBlobStreaming(r.Context(), dgst, body, r.ContentLength); err != nil { + log.WithFunc("server.persistMonolithicUpload").Errorf(r.Context(), err, "stream blob sha256:%s (content-length=%d) failed", dgst, r.ContentLength) + v2Error(w, http.StatusInternalServerError, "INTERNAL_ERROR", err.Error()) + return + } + + if got := "sha256:" + hex.EncodeToString(hasher.Sum(nil)); got != digest { + s.discardCorruptBlob(r.Context(), dgst) + v2Error(w, http.StatusBadRequest, "DIGEST_INVALID", + fmt.Sprintf("digest mismatch: got %s, expected %s", got, digest)) + return + } + s.blobCreated(w, name, digest) +} + +// discardCorruptBlob deletes a digest-mismatched blob on a detached, bounded +// context: a delete suppressed by request cancellation would leave unverified +// bytes the dedup path later trusts. A failure is logged for a backstop sweep. +func (s *Server) discardCorruptBlob(ctx context.Context, digest string) { + ctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), discardBlobTimeout) + defer cancel() + if err := s.reg.DeleteBlob(ctx, digest); err != nil { + log.WithFunc("server.discardCorruptBlob").Errorf(ctx, err, + "delete corrupt blob sha256:%s failed; digest key holds unverified bytes the dedup path will trust", digest) + } +} + +// acquireStreamSlot blocks for one of the bounded streaming-upload slots so +// concurrent 256 MiB uploads can't exhaust memory; ok is false if ctx ends first. +func (s *Server) acquireStreamSlot(ctx context.Context) (release func(), ok bool) { + if s.streamSem == nil { + return func() {}, true // unbounded when built without New + } + select { + case s.streamSem <- struct{}{}: + return func() { <-s.streamSem }, true + case <-ctx.Done(): + return nil, false + } } // persistVerifiedBlob verifies the digest then streams to the object store. +// Used by the chunked PATCH upload path, where the full blob is spooled to +// disk first so the digest can be checked before it reaches the object store. func (s *Server) persistVerifiedBlob(w http.ResponseWriter, r *http.Request, name, digest string, fu *FinalizedUpload) { if got := fu.Digest(); got != digest { v2Error(w, http.StatusBadRequest, "DIGEST_INVALID", @@ -112,14 +173,19 @@ func (s *Server) persistVerifiedBlob(w http.ResponseWriter, r *http.Request, nam rdr, err := fu.Reader() if err != nil { + log.WithFunc("server.persistVerifiedBlob").Errorf(r.Context(), err, "open spooled blob %s failed", digest) v2Error(w, http.StatusInternalServerError, "INTERNAL_ERROR", err.Error()) return } if err := s.reg.PushBlobFromStream(r.Context(), stripSHA256Prefix(digest), rdr, fu.Size()); err != nil { - v2Error(w, http.StatusInternalServerError, "BLOB_UPLOAD_INVALID", err.Error()) + log.WithFunc("server.persistVerifiedBlob").Errorf(r.Context(), err, "push spooled blob %s (size=%d) failed", digest, fu.Size()) + v2Error(w, http.StatusInternalServerError, "INTERNAL_ERROR", err.Error()) return } + s.blobCreated(w, name, digest) +} +func (s *Server) blobCreated(w http.ResponseWriter, name, digest string) { w.Header().Set("Location", fmt.Sprintf("/v2/%s/blobs/%s", name, digest)) w.Header().Set("Docker-Content-Digest", digest) w.WriteHeader(http.StatusCreated) diff --git a/server/server.go b/server/server.go index 73f89ec..52ddfbe 100644 --- a/server/server.go +++ b/server/server.go @@ -9,6 +9,7 @@ import ( "net" "net/http" "os" + "strconv" "time" commonhttpx "github.com/cocoonstack/cocoon-common/httpx" @@ -22,8 +23,9 @@ import ( ) const ( - defaultUploadSpoolDir = "/var/cache/epoch/uploads" - shutdownTimeout = 15 * time.Second + defaultUploadSpoolDir = "/var/cache/epoch/uploads" + shutdownTimeout = 15 * time.Second + defaultMaxStreamingUploads = 8 ) var _ http.ResponseWriter = (*responseWriter)(nil) @@ -42,6 +44,7 @@ type Server struct { router *mux.Router // runtime uploads *uploadSessions // runtime uiHandler http.Handler // runtime + streamSem chan struct{} // runtime — caps concurrent 256 MiB streaming uploads } // New creates a Server with routes, auth, and upload sessions configured. @@ -62,6 +65,8 @@ func New(ctx context.Context, reg *registry.Registry, st *store.Store, addr stri if blobRedirect { logger.Infof(ctx, "blob redirect enabled, ttl=%s", blobRedirectTTL) } + maxStreams := resolveMaxStreamingUploads(os.Getenv("EPOCH_MAX_STREAMING_UPLOADS")) + logger.Infof(ctx, "max concurrent streaming uploads: %d", maxStreams) s := &Server{ addr: addr, registryToken: regToken, @@ -72,6 +77,7 @@ func New(ctx context.Context, reg *registry.Registry, st *store.Store, addr stri store: st, router: mux.NewRouter(), uploads: newUploadSessions(resolveUploadDir(ctx)), + streamSem: make(chan struct{}, maxStreams), } s.setupRoutes(ctx) return s @@ -226,6 +232,14 @@ func resolveUploadDir(ctx context.Context) string { return fallback } +// resolveMaxStreamingUploads caps concurrent streaming uploads; size to mem_limit / 256 MiB. +func resolveMaxStreamingUploads(raw string) int { + if n, err := strconv.Atoi(raw); err == nil && n > 0 { + return n + } + return defaultMaxStreamingUploads +} + func newHTTPServer(ctx context.Context, addr string, handler http.Handler) *http.Server { srv := commonhttpx.NewServer(addr, handler) srv.IdleTimeout = 60 * time.Second