Files
octocam/internal/pipeline/pipeline.go
T
ryan.mcgee 5c7b1482d6 Default overlay to enabled in HTML mode
Fresh installs now show the overlay over the live picture in the web
UI without any server-side overlay processing. Previously the overlay
defaulted to off entirely.

The change is just to loadState()'s default values; both fields are
still read from the state file when present, so existing users who
have explicitly toggled either setting keep their preference. Users
upgrading from before overlay_baked existed get the new HTML default
(was implicitly baked before).

Behavior with the new defaults on a fresh install:
- Master overlay: on
- Overlay type: HTML (baked off)
- runOverlay() drops frames without decoding (no CPU overhead)
- /stream/overlay and /snapshot/overlay return 503 (HTML mode)
- RTSP overlay process does not start
- Web UI shows stats overlaid via CSS on top of the raw stream

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 15:22:44 -05:00

343 lines
11 KiB
Go

package pipeline
import (
"bytes"
"encoding/json"
"fmt"
"image"
"image/draw"
"image/jpeg"
"log"
"os"
"os/exec"
"path/filepath"
"sync/atomic"
"github.com/rfmcgee/octocam/internal/capture"
"github.com/rfmcgee/octocam/internal/config"
"github.com/rfmcgee/octocam/internal/mjpeg"
"github.com/rfmcgee/octocam/internal/octoprint"
"github.com/rfmcgee/octocam/internal/overlay"
)
// Pipeline wires the camera capture to two ffmpeg publishers and two MJPEG broadcasters.
type Pipeline struct {
cfg *config.Config
cam *capture.Camera
client *octoprint.Client
raw *Process
over *Process
rawBC *mjpeg.Broadcaster
overBC *mjpeg.Broadcaster
overlay atomic.Bool // master toggle for the overlay feed (rendering, broadcaster, RTSP)
overlayBaked atomic.Bool // true = render overlay server-side and bake into frames; false = HTML overlay only
overlayRTSPPref atomic.Bool // user preference for the overlay RTSP, even when master is off
}
// New creates a Pipeline. Call Run() to start.
func New(cfg *config.Config, cam *capture.Camera, client *octoprint.Client) *Pipeline {
// Raw stream: MJPEG passthrough — no re-encode, minimal CPU.
rawArgs := ffmpegCopyArgs(cfg.FFmpegBin, cfg.MediaMTXRTSP+"/raw", cfg.FPS)
// Overlay stream: H264 at reduced FPS to keep CPU manageable.
encoder := detectEncoder(cfg.FFmpegBin, cfg.FFmpegHW)
log.Printf("pipeline: overlay RTSP encoder %q at %dfps", encoder, cfg.OverlayRTSPFPS)
overArgs := FFmpegArgs(cfg.FFmpegBin, encoder, cfg.MediaMTXRTSP+"/overlay", cfg.Width, cfg.Height, cfg.OverlayRTSPFPS)
return &Pipeline{
cfg: cfg,
cam: cam,
client: client,
raw: NewProcess("raw", rawArgs),
over: NewProcess("overlay", overArgs),
rawBC: mjpeg.NewBroadcaster(),
overBC: mjpeg.NewBroadcaster(),
}
}
// RawBroadcaster returns the MJPEG broadcaster for the raw stream.
func (p *Pipeline) RawBroadcaster() *mjpeg.Broadcaster { return p.rawBC }
// OverlayBroadcaster returns the MJPEG broadcaster for the overlay stream.
func (p *Pipeline) OverlayBroadcaster() *mjpeg.Broadcaster { return p.overBC }
// FeedState describes the current state of an RTSP feed.
type FeedState struct {
Enabled bool `json:"enabled"`
Running bool `json:"running"`
}
// FeedStates returns the current enabled/running state of both RTSP feeds.
// For the overlay RTSP, Enabled reflects the user preference even when the
// master overlay toggle is off; Running reflects the actual subprocess state.
func (p *Pipeline) FeedStates() map[string]FeedState {
return map[string]FeedState{
"raw": {Enabled: p.raw.Enabled(), Running: p.raw.Running()},
"overlay": {Enabled: p.overlayRTSPPref.Load(), Running: p.over.Running()},
}
}
// OverlayEnabled reports whether the overlay feed (rendering, MJPEG, RTSP) is active.
func (p *Pipeline) OverlayEnabled() bool { return p.overlay.Load() }
// OverlayBaked reports whether the overlay is rendered server-side and baked
// into the frames. When false the overlay is meant to be drawn client-side as
// HTML and the server skips the decode/draw/encode loop entirely.
func (p *Pipeline) OverlayBaked() bool { return p.overlayBaked.Load() }
// SetOverlayEnabled toggles the master overlay feed. When disabled the overlay
// rendering loop drops frames (saving CPU), the overlay broadcaster is cleared
// so /snapshot/overlay returns 503, and the overlay RTSP process is stopped.
// When enabled, the overlay RTSP process is started if the user's RTSP overlay
// preference is on. The user's preference is preserved across master toggles.
func (p *Pipeline) SetOverlayEnabled(enabled bool) {
p.overlay.Store(enabled)
if !enabled {
p.overBC.Clear()
p.over.Disable()
} else if p.overlayRTSPPref.Load() && p.overlayBaked.Load() {
p.over.Enable()
}
p.saveState()
}
// SetOverlayBaked switches between baked (server-rendered, in-frame) and HTML
// (client-rendered, on webpage) overlay rendering. Switching to HTML clears
// the overlay broadcaster's cached frame and stops the overlay RTSP process
// since the overlay is no longer in the video stream. Switching back to baked
// restarts the RTSP process if the user preference is on.
func (p *Pipeline) SetOverlayBaked(baked bool) {
p.overlayBaked.Store(baked)
if !baked {
p.overBC.Clear()
p.over.Disable()
} else if p.overlay.Load() && p.overlayRTSPPref.Load() {
p.over.Enable()
}
p.saveState()
}
// pipelineState is the on-disk representation of runtime toggles.
type pipelineState struct {
OverlayEnabled bool `json:"overlay_enabled"`
OverlayBaked bool `json:"overlay_baked"`
RTSPRaw bool `json:"raw"`
RTSPOverlay bool `json:"overlay"`
}
func (p *Pipeline) loadState() pipelineState {
// Defaults on first run: overlay enabled in HTML mode (no server-side
// JPEG decode/draw/encode), both RTSP feeds off. The state file overrides
// any field it specifies, so toggles set by the user persist across reboots.
state := pipelineState{OverlayEnabled: true}
data, err := os.ReadFile(p.cfg.StateFile)
if err != nil {
return state // file absent = first run, use defaults
}
if err := json.Unmarshal(data, &state); err != nil {
log.Printf("pipeline: state file corrupt, using defaults: %v", err)
}
return state
}
func (p *Pipeline) saveState() {
state := pipelineState{
OverlayEnabled: p.overlay.Load(),
OverlayBaked: p.overlayBaked.Load(),
RTSPRaw: p.raw.Enabled(),
RTSPOverlay: p.overlayRTSPPref.Load(),
}
data, err := json.Marshal(state)
if err != nil {
return
}
if err := os.MkdirAll(filepath.Dir(p.cfg.StateFile), 0750); err != nil {
log.Printf("pipeline: could not create state dir: %v", err)
return
}
if err := os.WriteFile(p.cfg.StateFile, data, 0640); err != nil {
log.Printf("pipeline: could not save state: %v", err)
}
}
// SetFeed enables or disables the named RTSP feed ("raw" or "overlay") and persists the change.
// Setting the RTSP overlay preference is always honored, but the process only
// runs when the master overlay toggle is also on.
func (p *Pipeline) SetFeed(name string, enabled bool) {
switch name {
case "raw":
if enabled {
p.raw.Enable()
} else {
p.raw.Disable()
}
case "overlay":
p.overlayRTSPPref.Store(enabled)
// RTSP overlay only runs if master overlay is on AND the overlay is
// baked into frames — HTML overlay mode has no video-side overlay.
if p.overlay.Load() && p.overlayBaked.Load() {
if enabled {
p.over.Enable()
} else {
p.over.Disable()
}
}
default:
return
}
p.saveState()
}
// Run loads persisted state then starts the frame distribution loops.
// The overlay feed and both RTSP processes are only activated if their saved
// state is enabled. Default for all toggles is off.
func (p *Pipeline) Run() {
state := p.loadState()
log.Printf("pipeline: state — overlay=%v baked=%v rtsp_raw=%v rtsp_overlay=%v",
state.OverlayEnabled, state.OverlayBaked, state.RTSPRaw, state.RTSPOverlay)
p.overlay.Store(state.OverlayEnabled)
p.overlayBaked.Store(state.OverlayBaked)
p.overlayRTSPPref.Store(state.RTSPOverlay)
if state.RTSPRaw {
if err := p.raw.Start(); err != nil {
log.Printf("pipeline: raw ffmpeg start error: %v", err)
}
} else {
p.raw.disabled.Store(true)
}
// Overlay RTSP only runs if master overlay is on, overlay is baked, and the
// user's RTSP overlay preference is on.
if state.RTSPOverlay && state.OverlayEnabled && state.OverlayBaked {
if err := p.over.Start(); err != nil {
log.Printf("pipeline: overlay ffmpeg start error: %v", err)
}
} else {
p.over.disabled.Store(true)
}
go p.runRaw()
go p.runOverlay()
}
// Stop gracefully shuts down both ffmpeg subprocesses.
func (p *Pipeline) Stop() {
p.raw.Stop()
p.over.Stop()
}
// runRaw publishes every captured frame to the raw HTTP broadcaster and
// forwards it to the raw RTSP ffmpeg process when that's running.
//
// INVARIANT: the raw broadcaster is published to unconditionally — there is no
// toggle that suppresses it. This guarantees that /stream/raw, /snapshot/raw,
// /webcam/?action=stream, and /webcam/?action=snapshot always serve fresh,
// un-overlayed frames, which is what OctoPrint's Classic Webcam plugin uses
// for live feeds and timelapse snapshots.
func (p *Pipeline) runRaw() {
for frame := range p.cam.RawCh() {
// Broadcast to HTTP viewers first (no encode cost).
p.rawBC.Publish(frame.Data)
// Pass raw JPEG to ffmpeg for RTSP (copy mode — no re-encode).
if err := p.raw.WriteFrame(frame.Data); err != nil {
log.Printf("pipeline: raw write error: %v", err)
}
}
}
func (p *Pipeline) runOverlay() {
// Only send every Nth frame to the overlay RTSP encoder to limit CPU.
// HTTP viewers always receive the full rate.
rtspEvery := 1
if p.cfg.OverlayRTSPFPS > 0 && p.cfg.FPS > p.cfg.OverlayRTSPFPS {
rtspEvery = p.cfg.FPS / p.cfg.OverlayRTSPFPS
}
frameNum := 0
for frame := range p.cam.OverlayCh() {
if !p.overlay.Load() || !p.overlayBaked.Load() {
// Master toggle off, or HTML overlay mode — skip the decode/draw/encode.
continue
}
out, err := applyOverlay(frame.Data, p.client.Snapshot(), p.cfg.Quality)
if err != nil {
log.Printf("pipeline: overlay error: %v", err)
continue
}
// HTTP: every frame.
p.overBC.Publish(out)
// RTSP: throttled.
frameNum++
if frameNum%rtspEvery == 0 {
if err := p.over.WriteFrame(out); err != nil {
log.Printf("pipeline: overlay write error: %v", err)
}
}
}
}
// applyOverlay decodes a JPEG, draws the overlay, and re-encodes it.
func applyOverlay(jpegData []byte, snap *octoprint.Snapshot, quality int) ([]byte, error) {
src, err := jpeg.Decode(bytes.NewReader(jpegData))
if err != nil {
return nil, err
}
rgba := image.NewRGBA(src.Bounds())
draw.Draw(rgba, rgba.Bounds(), src, image.Point{}, draw.Src)
overlay.DrawOverlay(rgba, snap)
var buf bytes.Buffer
if err := jpeg.Encode(&buf, rgba, &jpeg.Options{Quality: quality}); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// ffmpegCopyArgs builds ffmpeg args for MJPEG passthrough to RTSP (no re-encode).
func ffmpegCopyArgs(bin, rtspURL string, fps int) []string {
return []string{
bin,
"-loglevel", "warning",
"-f", "mjpeg",
"-r", fmt.Sprintf("%d", fps),
"-i", "pipe:0",
"-c:v", "copy",
"-f", "rtsp",
"-rtsp_transport", "tcp",
rtspURL,
}
}
// detectEncoder probes for the best available hardware encoder.
func detectEncoder(bin, pref string) string {
if pref != "auto" {
return pref
}
for _, enc := range []string{"h264_v4l2m2m", "h264_rkmpp"} {
if probeEncoder(bin, enc) {
return enc
}
}
return "libx264"
}
// probeEncoder runs a one-frame test encode to verify the encoder works on this hardware.
func probeEncoder(bin, enc string) bool {
cmd := exec.Command(bin,
"-hide_banner", "-loglevel", "error",
"-f", "lavfi", "-i", "color=black:s=64x64:r=1",
"-frames:v", "1",
"-c:v", enc,
"-f", "null", "/dev/null",
)
if err := cmd.Run(); err != nil {
log.Printf("pipeline: encoder %q not available: %v", enc, err)
return false
}
return true
}