install go2rtc on bob

This commit is contained in:
2026-04-04 19:36:14 +02:00
parent f0b56e63d1
commit ccf88187b8
537 changed files with 69213 additions and 0 deletions
@@ -0,0 +1,141 @@
# Streams
This core module is responsible for managing the stream list.
## Stream to camera
[`new in v1.3.0`](https://github.com/AlexxIT/go2rtc/releases/tag/v1.3.0)
go2rtc supports playing audio files (ex. music or [TTS](https://www.home-assistant.io/integrations/#text-to-speech)) and live streams (ex. radio) on cameras with [two-way audio](../../README.md#two-way-audio) support.
API example:
```text
POST http://localhost:1984/api/streams?dst=camera1&src=ffmpeg:http://example.com/song.mp3#audio=pcma#input=file
```
- you can stream: local files, web files, live streams or any format, supported by FFmpeg
- you should use [ffmpeg source](../ffmpeg/README.md) for transcoding audio to codec, that your camera supports
- you can check camera codecs on the go2rtc WebUI info page when the stream is active
- some cameras support only low quality `PCMA/8000` codec (ex. [Tapo](../tapo/README.md))
- it is recommended to choose higher quality formats if your camera supports them (ex. `PCMA/48000` for some Dahua cameras)
- if you play files over `http` link, you need to add `#input=file` params for transcoding, so the file will be transcoded and played in real time
- if you play live streams, you should skip `#input` param, because it is already in real time
- you can stop active playback by calling the API with the empty `src` parameter
- you will see one active producer and one active consumer in go2rtc WebUI info page during streaming
## Publish stream
[`new in v1.8.0`](https://github.com/AlexxIT/go2rtc/releases/tag/v1.8.0)
You can publish any stream to streaming services (YouTube, Telegram, etc.) via RTMP/RTMPS. Important:
- Supported codecs: H264 for video and AAC for audio
- AAC audio is required for YouTube; videos without audio will not work
- You don't need to enable [RTMP module](../rtmp/README.md) listening for this task
You can use the API:
```text
POST http://localhost:1984/api/streams?src=camera1&dst=rtmps://...
```
Or config file:
```yaml
publish:
# publish stream "video_audio_transcode" to Telegram
video_audio_transcode:
- rtmps://xxx-x.rtmp.t.me/s/xxxxxxxxxx:xxxxxxxxxxxxxxxxxxxxxx
# publish stream "audio_transcode" to Telegram and YouTube
audio_transcode:
- rtmps://xxx-x.rtmp.t.me/s/xxxxxxxxxx:xxxxxxxxxxxxxxxxxxxxxx
- rtmp://xxx.rtmp.youtube.com/live2/xxxx-xxxx-xxxx-xxxx-xxxx
streams:
video_audio_transcode:
- ffmpeg:rtsp://user:pass@192.168.1.123/stream1#video=h264#hardware#audio=aac
audio_transcode:
- ffmpeg:rtsp://user:pass@192.168.1.123/stream1#video=copy#audio=aac
```
- **Telegram Desktop App** > Any public or private channel or group (where you admin) > Live stream > Start with... > Start streaming.
- **YouTube** > Create > Go live > Stream latency: Ultra low-latency > Copy: Stream URL + Stream key.
## Preload stream
[`new in v1.9.11`](https://github.com/AlexxIT/go2rtc/releases/tag/v1.9.11)
You can preload any stream on go2rtc start. This is useful for cameras that take a long time to start up.
```yaml
preload:
camera1: # default: video&audio = ANY
camera2: "video" # preload only video track
camera3: "video=h264&audio=opus" # preload H264 video and OPUS audio
streams:
camera1:
- rtsp://192.168.1.100/stream
camera2:
- rtsp://192.168.1.101/stream
camera3:
- rtsp://192.168.1.102/h265stream
- ffmpeg:camera3#video=h264#audio=opus#hardware
```
## Examples
```yaml
streams:
# known RTSP sources
rtsp-dahua1: rtsp://admin:password@192.168.10.90/cam/realmonitor?channel=1&subtype=0&unicast=true&proto=Onvif
rtsp-dahua2: rtsp://admin:password@192.168.10.90/cam/realmonitor?channel=1&subtype=1
rtsp-tplink1: rtsp://admin:password@192.168.10.91/stream1
rtsp-tplink2: rtsp://admin:password@192.168.10.91/stream2
rtsp-reolink1: rtsp://admin:password@192.168.10.92/h264Preview_01_main
rtsp-reolink2: rtsp://admin:password@192.168.10.92/h264Preview_01_sub
rtsp-sonoff1: rtsp://admin:password@192.168.10.93/av_stream/ch0
rtsp-sonoff2: rtsp://admin:password@192.168.10.93/av_stream/ch1
# known RTMP sources
rtmp-reolink1: rtmp://192.168.10.92/bcs/channel0_main.bcs?channel=0&stream=0&user=admin&password=password
rtmp-reolink2: rtmp://192.168.10.92/bcs/channel0_sub.bcs?channel=0&stream=1&user=admin&password=password
rtmp-reolink3: rtmp://192.168.10.92/bcs/channel0_ext.bcs?channel=0&stream=1&user=admin&password=password
# known HTTP sources
http-reolink1: http://192.168.10.92/flv?port=1935&app=bcs&stream=channel0_main.bcs&user=admin&password=password
http-reolink2: http://192.168.10.92/flv?port=1935&app=bcs&stream=channel0_sub.bcs&user=admin&password=password
http-reolink3: http://192.168.10.92/flv?port=1935&app=bcs&stream=channel0_ext.bcs&user=admin&password=password
# known ONVIF sources
onvif-dahua1: onvif://admin:password@192.168.10.90?subtype=MediaProfile00000
onvif-dahua2: onvif://admin:password@192.168.10.90?subtype=MediaProfile00001
onvif-dahua3: onvif://admin:password@192.168.10.90?subtype=MediaProfile00000&snapshot
onvif-tplink1: onvif://admin:password@192.168.10.91:2020?subtype=profile_1
onvif-tplink2: onvif://admin:password@192.168.10.91:2020?subtype=profile_2
onvif-reolink1: onvif://admin:password@192.168.10.92:8000?subtype=000
onvif-reolink2: onvif://admin:password@192.168.10.92:8000?subtype=001
onvif-reolink3: onvif://admin:password@192.168.10.92:8000?subtype=000&snapshot
onvif-openipc1: onvif://admin:password@192.168.10.95:80?subtype=PROFILE_000
onvif-openipc2: onvif://admin:password@192.168.10.95:80?subtype=PROFILE_001
# some EXEC examples
exec-h264-pipe: exec:ffmpeg -re -i bbb.mp4 -c copy -f h264 -
exec-flv-pipe: exec:ffmpeg -re -i bbb.mp4 -c copy -f flv -
exec-mpegts-pipe: exec:ffmpeg -re -i bbb.mp4 -c copy -f mpegts -
exec-adts-pipe: exec:ffmpeg -re -i bbb.mp4 -c copy -f adts -
exec-mjpeg-pipe: exec:ffmpeg -re -i bbb.mp4 -c mjpeg -f mjpeg -
exec-hevc-pipe: exec:ffmpeg -re -i bbb.mp4 -c libx265 -preset superfast -tune zerolatency -f hevc -
exec-wav-pipe: exec:ffmpeg -re -i bbb.mp4 -c pcm_alaw -ar 8000 -ac 1 -f wav -
exec-y4m-pipe: exec:ffmpeg -re -i bbb.mp4 -c rawvideo -f yuv4mpegpipe -
exec-pcma-pipe: exec:ffmpeg -re -i numb.mp3 -c:a pcm_alaw -ar:a 8000 -ac:a 1 -f wav -
exec-pcmu-pipe: exec:ffmpeg -re -i numb.mp3 -c:a pcm_mulaw -ar:a 8000 -ac:a 1 -f wav -
exec-s16le-pipe: exec:ffmpeg -re -i numb.mp3 -c:a pcm_s16le -ar:a 16000 -ac:a 1 -f wav -
# some FFmpeg examples
ffmpeg-video-h264: ffmpeg:virtual?video#video=h264
ffmpeg-video-4K: ffmpeg:virtual?video&size=4K#video=h264
ffmpeg-video-10s: ffmpeg:virtual?video&duration=10#video=h264
ffmpeg-video-src2: ffmpeg:virtual?video=testsrc2&size=2K#video=h264
```
@@ -0,0 +1,166 @@
package streams
import (
"errors"
"strings"
"github.com/AlexxIT/go2rtc/pkg/core"
)
func (s *Stream) AddConsumer(cons core.Consumer) (err error) {
// support for multiple simultaneous pending from different consumers
consN := s.pending.Add(1) - 1
var prodErrors = make([]error, len(s.producers))
var prodMedias []*core.Media
var prodStarts []*Producer
// Step 1. Get consumer medias
consMedias := cons.GetMedias()
for _, consMedia := range consMedias {
log.Trace().Msgf("[streams] check cons=%d media=%s", consN, consMedia)
producers:
for prodN, prod := range s.producers {
// check for loop request, ex. `camera1: ffmpeg:camera1`
if info, ok := cons.(core.Info); ok && prod.url == info.GetSource() {
log.Trace().Msgf("[streams] skip cons=%d prod=%d", consN, prodN)
continue
}
if prodErrors[prodN] != nil {
log.Trace().Msgf("[streams] skip cons=%d prod=%d", consN, prodN)
continue
}
if err = prod.Dial(); err != nil {
log.Trace().Err(err).Msgf("[streams] dial cons=%d prod=%d", consN, prodN)
prodErrors[prodN] = err
continue
}
// Step 2. Get producer medias (not tracks yet)
for _, prodMedia := range prod.GetMedias() {
log.Trace().Msgf("[streams] check cons=%d prod=%d media=%s", consN, prodN, prodMedia)
prodMedias = append(prodMedias, prodMedia)
// Step 3. Match consumer/producer codecs list
prodCodec, consCodec := prodMedia.MatchMedia(consMedia)
if prodCodec == nil {
continue
}
var track *core.Receiver
switch prodMedia.Direction {
case core.DirectionRecvonly:
log.Trace().Msgf("[streams] match cons=%d <= prod=%d", consN, prodN)
// Step 4. Get recvonly track from producer
if track, err = prod.GetTrack(prodMedia, prodCodec); err != nil {
log.Info().Err(err).Msg("[streams] can't get track")
prodErrors[prodN] = err
continue
}
// Step 5. Add track to consumer
if err = cons.AddTrack(consMedia, consCodec, track); err != nil {
log.Info().Err(err).Msg("[streams] can't add track")
continue
}
case core.DirectionSendonly:
log.Trace().Msgf("[streams] match cons=%d => prod=%d", consN, prodN)
// Step 4. Get recvonly track from consumer (backchannel)
if track, err = cons.(core.Producer).GetTrack(consMedia, consCodec); err != nil {
log.Info().Err(err).Msg("[streams] can't get track")
continue
}
// Step 5. Add track to producer
if err = prod.AddTrack(prodMedia, prodCodec, track); err != nil {
log.Info().Err(err).Msg("[streams] can't add track")
prodErrors[prodN] = err
continue
}
}
prodStarts = append(prodStarts, prod)
if !consMedia.MatchAll() {
break producers
}
}
}
}
// stop producers if they don't have readers
if s.pending.Add(-1) == 0 {
s.stopProducers()
}
if len(prodStarts) == 0 {
return formatError(consMedias, prodMedias, prodErrors)
}
s.mu.Lock()
s.consumers = append(s.consumers, cons)
s.mu.Unlock()
// there may be duplicates, but that's not a problem
for _, prod := range prodStarts {
prod.start()
}
return nil
}
func formatError(consMedias, prodMedias []*core.Media, prodErrors []error) error {
// 1. Return errors if any not nil
var text string
for _, err := range prodErrors {
if err != nil {
text = appendString(text, err.Error())
}
}
if len(text) != 0 {
return errors.New("streams: " + text)
}
// 2. Return "codecs not matched"
if prodMedias != nil {
var prod, cons string
for _, media := range prodMedias {
if media.Direction == core.DirectionRecvonly {
for _, codec := range media.Codecs {
prod = appendString(prod, media.Kind+":"+codec.PrintName())
}
}
}
for _, media := range consMedias {
if media.Direction == core.DirectionSendonly {
for _, codec := range media.Codecs {
cons = appendString(cons, media.Kind+":"+codec.PrintName())
}
}
}
return errors.New("streams: codecs not matched: " + prod + " => " + cons)
}
// 3. Return unknown error
return errors.New("streams: unknown error")
}
func appendString(s, elem string) string {
if strings.Contains(s, elem) {
return s
}
if len(s) == 0 {
return elem
}
return s + ", " + elem
}
@@ -0,0 +1,181 @@
package streams
import (
"net/http"
"github.com/AlexxIT/go2rtc/internal/api"
"github.com/AlexxIT/go2rtc/internal/app"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/creds"
"github.com/AlexxIT/go2rtc/pkg/probe"
)
func apiStreams(w http.ResponseWriter, r *http.Request) {
w = creds.SecretResponse(w)
query := r.URL.Query()
src := query.Get("src")
// without source - return all streams list
if src == "" && r.Method != "POST" {
api.ResponseJSON(w, streams)
return
}
// Not sure about all this API. Should be rewrited...
switch r.Method {
case "GET":
stream := Get(src)
if stream == nil {
http.Error(w, "", http.StatusNotFound)
return
}
cons := probe.Create("probe", query)
if len(cons.Medias) != 0 {
cons.WithRequest(r)
if err := stream.AddConsumer(cons); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
api.ResponsePrettyJSON(w, stream)
stream.RemoveConsumer(cons)
} else {
api.ResponsePrettyJSON(w, streams[src])
}
case "PUT":
name := query.Get("name")
if name == "" {
name = src
}
if _, err := New(name, query["src"]...); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err := app.PatchConfig([]string{"streams", name}, query["src"]); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
}
case "PATCH":
name := query.Get("name")
if name == "" {
http.Error(w, "", http.StatusBadRequest)
return
}
// support {input} templates: https://github.com/AlexxIT/go2rtc#module-hass
if _, err := Patch(name, src); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
}
case "POST":
// with dst - redirect source to dst
if dst := query.Get("dst"); dst != "" {
if stream := Get(dst); stream != nil {
if err := Validate(src); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
} else if err = stream.Play(src); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
} else {
api.ResponseJSON(w, stream)
}
} else if stream = Get(src); stream != nil {
if err := Validate(dst); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
} else if err = stream.Publish(dst); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
} else {
http.Error(w, "", http.StatusNotFound)
}
} else {
http.Error(w, "", http.StatusBadRequest)
}
case "DELETE":
delete(streams, src)
if err := app.PatchConfig([]string{"streams", src}, nil); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
}
}
}
func apiStreamsDOT(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
dot := make([]byte, 0, 1024)
dot = append(dot, "digraph {\n"...)
if query.Has("src") {
for _, name := range query["src"] {
if stream := streams[name]; stream != nil {
dot = AppendDOT(dot, stream)
}
}
} else {
for _, stream := range streams {
dot = AppendDOT(dot, stream)
}
}
dot = append(dot, '}')
dot = []byte(creds.SecretString(string(dot)))
api.Response(w, dot, "text/vnd.graphviz")
}
func apiPreload(w http.ResponseWriter, r *http.Request) {
// GET - return all preloads
if r.Method == "GET" {
api.ResponseJSON(w, GetPreloads())
return
}
query := r.URL.Query()
src := query.Get("src")
switch r.Method {
case "PUT":
// it's safe to delete from map while iterating
for k := range query {
switch k {
case core.KindVideo, core.KindAudio, "microphone":
default:
delete(query, k)
}
}
rawQuery := query.Encode()
if err := AddPreload(src, rawQuery); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err := app.PatchConfig([]string{"preload", src}, rawQuery); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
case "DELETE":
if err := DelPreload(src); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err := app.PatchConfig([]string{"preload", src}, nil); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
default:
http.Error(w, "", http.StatusMethodNotAllowed)
}
}
func apiSchemes(w http.ResponseWriter, r *http.Request) {
api.ResponseJSON(w, SupportedSchemes())
}
@@ -0,0 +1,66 @@
package streams
import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/stretchr/testify/require"
)
func TestApiSchemes(t *testing.T) {
// Setup: Register some test handlers and redirects
HandleFunc("rtsp", func(url string) (core.Producer, error) { return nil, nil })
HandleFunc("rtmp", func(url string) (core.Producer, error) { return nil, nil })
RedirectFunc("http", func(url string) (string, error) { return "", nil })
t.Run("GET request returns schemes", func(t *testing.T) {
req := httptest.NewRequest("GET", "/api/schemes", nil)
w := httptest.NewRecorder()
apiSchemes(w, req)
require.Equal(t, http.StatusOK, w.Code)
require.Equal(t, "application/json", w.Header().Get("Content-Type"))
var schemes []string
err := json.Unmarshal(w.Body.Bytes(), &schemes)
require.NoError(t, err)
require.NotEmpty(t, schemes)
// Check that our test schemes are in the response
require.Contains(t, schemes, "rtsp")
require.Contains(t, schemes, "rtmp")
require.Contains(t, schemes, "http")
})
}
func TestApiSchemesNoDuplicates(t *testing.T) {
// Setup: Register a scheme in both handlers and redirects
HandleFunc("duplicate", func(url string) (core.Producer, error) { return nil, nil })
RedirectFunc("duplicate", func(url string) (string, error) { return "", nil })
req := httptest.NewRequest("GET", "/api/schemes", nil)
w := httptest.NewRecorder()
apiSchemes(w, req)
require.Equal(t, http.StatusOK, w.Code)
var schemes []string
err := json.Unmarshal(w.Body.Bytes(), &schemes)
require.NoError(t, err)
// Count occurrences of "duplicate"
count := 0
for _, scheme := range schemes {
if scheme == "duplicate" {
count++
}
}
// Should only appear once
require.Equal(t, 1, count, "scheme 'duplicate' should appear exactly once")
}
@@ -0,0 +1,176 @@
package streams
import (
"encoding/json"
"fmt"
"strings"
)
func AppendDOT(dot []byte, stream *Stream) []byte {
for _, prod := range stream.producers {
if prod.conn == nil {
continue
}
c, err := marshalConn(prod.conn)
if err != nil {
continue
}
dot = c.appendDOT(dot, "producer")
}
for _, cons := range stream.consumers {
c, err := marshalConn(cons)
if err != nil {
continue
}
dot = c.appendDOT(dot, "consumer")
}
return dot
}
func marshalConn(v any) (*conn, error) {
b, err := json.Marshal(v)
if err != nil {
return nil, err
}
var c conn
if err = json.Unmarshal(b, &c); err != nil {
return nil, err
}
return &c, nil
}
const bytesK = "KMGTP"
func humanBytes(i int) string {
if i < 1000 {
return fmt.Sprintf("%d B", i)
}
f := float64(i) / 1000
var n uint8
for f >= 1000 && n < 5 {
f /= 1000
n++
}
return fmt.Sprintf("%.2f %cB", f, bytesK[n])
}
type node struct {
ID uint32 `json:"id"`
Codec map[string]any `json:"codec"`
Parent uint32 `json:"parent"`
Childs []uint32 `json:"childs"`
Bytes int `json:"bytes"`
//Packets uint32 `json:"packets"`
//Drops uint32 `json:"drops"`
}
var codecKeys = []string{"codec_name", "sample_rate", "channels", "profile", "level"}
func (n *node) name() string {
if name, ok := n.Codec["codec_name"].(string); ok {
return name
}
return "unknown"
}
func (n *node) codec() []byte {
b := make([]byte, 0, 128)
for _, k := range codecKeys {
if v := n.Codec[k]; v != nil {
b = fmt.Appendf(b, "%s=%v\n", k, v)
}
}
if l := len(b); l > 0 {
return b[:l-1]
}
return b
}
func (n *node) appendDOT(dot []byte, group string) []byte {
dot = fmt.Appendf(dot, "%d [group=%s, label=%q, title=%q];\n", n.ID, group, n.name(), n.codec())
//for _, sink := range n.Childs {
// dot = fmt.Appendf(dot, "%d -> %d;\n", n.ID, sink)
//}
return dot
}
type conn struct {
ID uint32 `json:"id"`
FormatName string `json:"format_name"`
Protocol string `json:"protocol"`
RemoteAddr string `json:"remote_addr"`
Source string `json:"source"`
URL string `json:"url"`
UserAgent string `json:"user_agent"`
Receivers []node `json:"receivers"`
Senders []node `json:"senders"`
BytesRecv int `json:"bytes_recv"`
BytesSend int `json:"bytes_send"`
}
func (c *conn) appendDOT(dot []byte, group string) []byte {
host := c.host()
dot = fmt.Appendf(dot, "%s [group=host];\n", host)
dot = fmt.Appendf(dot, "%d [group=%s, label=%q, title=%q];\n", c.ID, group, c.FormatName, c.label())
if group == "producer" {
dot = fmt.Appendf(dot, "%s -> %d [label=%q];\n", host, c.ID, humanBytes(c.BytesRecv))
} else {
dot = fmt.Appendf(dot, "%d -> %s [label=%q];\n", c.ID, host, humanBytes(c.BytesSend))
}
for _, recv := range c.Receivers {
dot = fmt.Appendf(dot, "%d -> %d [label=%q];\n", c.ID, recv.ID, humanBytes(recv.Bytes))
dot = recv.appendDOT(dot, "node")
}
for _, send := range c.Senders {
dot = fmt.Appendf(dot, "%d -> %d [label=%q];\n", send.Parent, c.ID, humanBytes(send.Bytes))
//dot = fmt.Appendf(dot, "%d -> %d [label=%q];\n", send.ID, c.ID, humanBytes(send.Bytes))
//dot = send.appendDOT(dot, "node")
}
return dot
}
func (c *conn) host() (s string) {
if c.Protocol == "pipe" {
return "127.0.0.1"
}
if s = c.RemoteAddr; s == "" {
return "unknown"
}
if i := strings.Index(s, "forwarded"); i > 0 {
s = s[i+10:]
}
if s[0] == '[' {
if i := strings.Index(s, "]"); i > 0 {
return s[1:i]
}
}
if i := strings.IndexAny(s, " ,:"); i > 0 {
return s[:i]
}
return
}
func (c *conn) label() string {
var sb strings.Builder
sb.WriteString("format_name=" + c.FormatName)
if c.Protocol != "" {
sb.WriteString("\nprotocol=" + c.Protocol)
}
if c.Source != "" {
sb.WriteString("\nsource=" + c.Source)
}
if c.URL != "" {
sb.WriteString("\nurl=" + c.URL)
}
if c.UserAgent != "" {
sb.WriteString("\nuser_agent=" + c.UserAgent)
}
// escape quotes https://github.com/AlexxIT/go2rtc/issues/1603
return strings.ReplaceAll(sb.String(), `"`, `'`)
}
@@ -0,0 +1,134 @@
package streams
import (
"errors"
"regexp"
"strings"
"github.com/AlexxIT/go2rtc/pkg/core"
)
type Handler func(source string) (core.Producer, error)
var handlers = map[string]Handler{}
func HandleFunc(scheme string, handler Handler) {
handlers[scheme] = handler
}
func SupportedSchemes() []string {
uniqueKeys := make(map[string]struct{}, len(handlers)+len(redirects))
for scheme := range handlers {
uniqueKeys[scheme] = struct{}{}
}
for scheme := range redirects {
uniqueKeys[scheme] = struct{}{}
}
resultKeys := make([]string, 0, len(uniqueKeys))
for key := range uniqueKeys {
resultKeys = append(resultKeys, key)
}
return resultKeys
}
func HasProducer(url string) bool {
if i := strings.IndexByte(url, ':'); i > 0 {
scheme := url[:i]
if _, ok := handlers[scheme]; ok {
return true
}
if _, ok := redirects[scheme]; ok {
return true
}
}
return false
}
func GetProducer(url string) (core.Producer, error) {
if i := strings.IndexByte(url, ':'); i > 0 {
scheme := url[:i]
if redirect, ok := redirects[scheme]; ok {
location, err := redirect(url)
if err != nil {
return nil, err
}
if location != "" {
return GetProducer(location)
}
}
if handler, ok := handlers[scheme]; ok {
return handler(url)
}
}
return nil, errors.New("streams: unsupported scheme: " + url)
}
// Redirect can return: location URL or error or empty URL and error
type Redirect func(url string) (string, error)
var redirects = map[string]Redirect{}
func RedirectFunc(scheme string, redirect Redirect) {
redirects[scheme] = redirect
}
func Location(url string) (string, error) {
if i := strings.IndexByte(url, ':'); i > 0 {
scheme := url[:i]
if redirect, ok := redirects[scheme]; ok {
return redirect(url)
}
}
return "", nil
}
// TODO: rework
type ConsumerHandler func(url string) (core.Consumer, func(), error)
var consumerHandlers = map[string]ConsumerHandler{}
func HandleConsumerFunc(scheme string, handler ConsumerHandler) {
consumerHandlers[scheme] = handler
}
func GetConsumer(url string) (core.Consumer, func(), error) {
if i := strings.IndexByte(url, ':'); i > 0 {
scheme := url[:i]
if handler, ok := consumerHandlers[scheme]; ok {
return handler(url)
}
}
return nil, nil, errors.New("streams: unsupported scheme: " + url)
}
var insecure = map[string]bool{}
func MarkInsecure(scheme string) {
insecure[scheme] = true
}
var sanitize = regexp.MustCompile(`\s`)
func Validate(source string) error {
// TODO: Review the entire logic of insecure sources
if i := strings.IndexByte(source, ':'); i > 0 {
if insecure[source[:i]] {
return errors.New("streams: source from insecure producer")
}
}
if sanitize.MatchString(source) {
return errors.New("streams: source with spaces may be insecure")
}
return nil
}
@@ -0,0 +1,22 @@
package streams
import (
"net/url"
"strings"
)
func ParseQuery(s string) url.Values {
if len(s) == 0 {
return nil
}
params := url.Values{}
for _, key := range strings.Split(s, "#") {
var value string
i := strings.IndexByte(key, '=')
if i > 0 {
key, value = key[:i], key[i+1:]
}
params[key] = append(params[key], value)
}
return params
}
@@ -0,0 +1,163 @@
package streams
import (
"errors"
"time"
"github.com/AlexxIT/go2rtc/pkg/core"
)
func (s *Stream) Play(urlOrProd any) error {
s.mu.Lock()
for _, producer := range s.producers {
if producer.state == stateInternal && producer.conn != nil {
_ = producer.conn.Stop()
}
}
s.mu.Unlock()
var source string
var src core.Producer
switch urlOrProd.(type) {
case string:
if source = urlOrProd.(string); source == "" {
return nil
}
case core.Producer:
src = urlOrProd.(core.Producer)
}
for _, producer := range s.producers {
if producer.conn == nil {
continue
}
cons, ok := producer.conn.(core.Consumer)
if !ok {
continue
}
if src == nil {
var err error
if src, err = GetProducer(source); err != nil {
return err
}
}
if !matchMedia(src, cons) {
continue
}
s.AddInternalProducer(src)
go func() {
_ = src.Start()
s.RemoveProducer(src)
}()
return nil
}
for _, producer := range s.producers {
// start new client
dst, err := GetProducer(producer.url)
if err != nil {
continue
}
// check if client support consumer interface
cons, ok := dst.(core.Consumer)
if !ok {
_ = dst.Stop()
continue
}
// start new producer
if src == nil {
if src, err = GetProducer(source); err != nil {
return err
}
}
if !matchMedia(src, cons) {
_ = dst.Stop()
continue
}
s.AddInternalProducer(src)
s.AddInternalConsumer(cons)
go func() {
_ = dst.Start()
_ = src.Stop()
s.RemoveInternalConsumer(cons)
}()
go func() {
_ = src.Start()
// little timeout before stop dst, so the buffer can be transferred
time.Sleep(time.Second)
_ = dst.Stop()
s.RemoveProducer(src)
}()
return nil
}
return errors.New("can't find consumer")
}
func (s *Stream) AddInternalProducer(conn core.Producer) {
producer := &Producer{conn: conn, state: stateInternal, url: "internal"}
s.mu.Lock()
s.producers = append(s.producers, producer)
s.mu.Unlock()
}
func (s *Stream) AddInternalConsumer(conn core.Consumer) {
s.mu.Lock()
s.consumers = append(s.consumers, conn)
s.mu.Unlock()
}
func (s *Stream) RemoveInternalConsumer(conn core.Consumer) {
s.mu.Lock()
for i, consumer := range s.consumers {
if consumer == conn {
s.consumers = append(s.consumers[:i], s.consumers[i+1:]...)
break
}
}
s.mu.Unlock()
}
func matchMedia(prod core.Producer, cons core.Consumer) bool {
for _, consMedia := range cons.GetMedias() {
for _, prodMedia := range prod.GetMedias() {
if prodMedia.Direction != core.DirectionRecvonly {
continue
}
prodCodec, consCodec := prodMedia.MatchMedia(consMedia)
if prodCodec == nil {
continue
}
track, err := prod.GetTrack(prodMedia, prodCodec)
if err != nil {
log.Warn().Err(err).Msg("[streams] can't get track")
continue
}
if err = cons.AddTrack(consMedia, consCodec, track); err != nil {
log.Warn().Err(err).Msg("[streams] can't add track")
continue
}
return true
}
}
return false
}
@@ -0,0 +1,69 @@
package streams
import (
"fmt"
"maps"
"net/url"
"sync"
"github.com/AlexxIT/go2rtc/pkg/probe"
)
type Preload struct {
stream *Stream // Don't include the stream in JSON to avoid leaking secrets.
Cons *probe.Probe `json:"consumer"`
Query string `json:"query"`
}
var preloads = map[string]*Preload{}
var preloadsMu sync.Mutex
func AddPreload(name, rawQuery string) error {
if rawQuery == "" {
rawQuery = "video&audio"
}
query, err := url.ParseQuery(rawQuery)
if err != nil {
return err
}
preloadsMu.Lock()
defer preloadsMu.Unlock()
if p := preloads[name]; p != nil {
p.stream.RemoveConsumer(p.Cons)
}
stream := Get(name)
if stream == nil {
return fmt.Errorf("streams: stream not found: %s", name)
}
cons := probe.Create("preload", query)
if err = stream.AddConsumer(cons); err != nil {
return err
}
preloads[name] = &Preload{stream: stream, Cons: cons, Query: rawQuery}
return nil
}
func DelPreload(name string) error {
preloadsMu.Lock()
defer preloadsMu.Unlock()
if p := preloads[name]; p != nil {
p.stream.RemoveConsumer(p.Cons)
delete(preloads, name)
return nil
}
return fmt.Errorf("streams: preload not found: %s", name)
}
func GetPreloads() map[string]*Preload {
preloadsMu.Lock()
defer preloadsMu.Unlock()
return maps.Clone(preloads)
}
@@ -0,0 +1,270 @@
package streams
import (
"encoding/json"
"errors"
"strings"
"sync"
"time"
"github.com/AlexxIT/go2rtc/pkg/core"
)
type state byte
const (
stateNone state = iota
stateMedias
stateTracks
stateStart
stateExternal
stateInternal
)
type Producer struct {
core.Listener
url string
template string
conn core.Producer
receivers []*core.Receiver
senders []*core.Receiver
state state
mu sync.Mutex
workerID int
}
const SourceTemplate = "{input}"
func NewProducer(source string) *Producer {
if strings.Contains(source, SourceTemplate) {
return &Producer{template: source}
}
return &Producer{url: source}
}
func (p *Producer) SetSource(s string) {
if p.template == "" {
p.url = s
} else {
p.url = strings.Replace(p.template, SourceTemplate, s, 1)
}
}
func (p *Producer) Dial() error {
p.mu.Lock()
defer p.mu.Unlock()
if p.state == stateNone {
conn, err := GetProducer(p.url)
if err != nil {
return err
}
p.conn = conn
p.state = stateMedias
}
return nil
}
func (p *Producer) GetMedias() []*core.Media {
p.mu.Lock()
defer p.mu.Unlock()
if p.conn == nil {
return nil
}
return p.conn.GetMedias()
}
func (p *Producer) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
p.mu.Lock()
defer p.mu.Unlock()
if p.state == stateNone {
return nil, errors.New("get track from none state")
}
for _, track := range p.receivers {
if track.Codec == codec {
return track, nil
}
}
track, err := p.conn.GetTrack(media, codec)
if err != nil {
return nil, err
}
p.receivers = append(p.receivers, track)
if p.state == stateMedias {
p.state = stateTracks
}
return track, nil
}
func (p *Producer) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error {
p.mu.Lock()
defer p.mu.Unlock()
if p.state == stateNone {
return errors.New("add track from none state")
}
if err := p.conn.(core.Consumer).AddTrack(media, codec, track); err != nil {
return err
}
p.senders = append(p.senders, track)
if p.state == stateMedias {
p.state = stateTracks
}
return nil
}
func (p *Producer) MarshalJSON() ([]byte, error) {
if conn := p.conn; conn != nil {
return json.Marshal(conn)
}
info := map[string]string{"url": p.url}
return json.Marshal(info)
}
// internals
func (p *Producer) start() {
p.mu.Lock()
defer p.mu.Unlock()
if p.state != stateTracks {
return
}
log.Debug().Msgf("[streams] start producer url=%s", p.url)
p.state = stateStart
p.workerID++
go p.worker(p.conn, p.workerID)
}
func (p *Producer) worker(conn core.Producer, workerID int) {
if err := conn.Start(); err != nil {
p.mu.Lock()
closed := p.workerID != workerID
p.mu.Unlock()
if closed {
return
}
log.Warn().Err(err).Str("url", p.url).Caller().Send()
}
p.reconnect(workerID, 0)
}
func (p *Producer) reconnect(workerID, retry int) {
p.mu.Lock()
defer p.mu.Unlock()
if p.workerID != workerID {
log.Trace().Msgf("[streams] stop reconnect url=%s", p.url)
return
}
log.Debug().Msgf("[streams] retry=%d to url=%s", retry, p.url)
conn, err := GetProducer(p.url)
if err != nil {
log.Debug().Msgf("[streams] producer=%s", err)
timeout := time.Minute
if retry < 5 {
timeout = time.Second
} else if retry < 10 {
timeout = time.Second * 5
} else if retry < 20 {
timeout = time.Second * 10
}
time.AfterFunc(timeout, func() {
p.reconnect(workerID, retry+1)
})
return
}
for _, media := range conn.GetMedias() {
switch media.Direction {
case core.DirectionRecvonly:
for i, receiver := range p.receivers {
codec := media.MatchCodec(receiver.Codec)
if codec == nil {
continue
}
track, err := conn.GetTrack(media, codec)
if err != nil {
continue
}
receiver.Replace(track)
p.receivers[i] = track
break
}
case core.DirectionSendonly:
for _, sender := range p.senders {
codec := media.MatchCodec(sender.Codec)
if codec == nil {
continue
}
_ = conn.(core.Consumer).AddTrack(media, codec, sender)
}
}
}
// stop previous connection after moving tracks (fix ghost exec/ffmpeg)
_ = p.conn.Stop()
// swap connections
p.conn = conn
go p.worker(conn, workerID)
}
func (p *Producer) stop() {
p.mu.Lock()
defer p.mu.Unlock()
switch p.state {
case stateExternal:
log.Trace().Msgf("[streams] skip stop external producer")
return
case stateNone:
log.Trace().Msgf("[streams] skip stop none producer")
return
case stateStart:
p.workerID++
}
log.Debug().Msgf("[streams] stop producer url=%s", p.url)
if p.conn != nil {
_ = p.conn.Stop()
p.conn = nil
}
p.state = stateNone
p.receivers = nil
p.senders = nil
}
@@ -0,0 +1,38 @@
package streams
import "time"
func (s *Stream) Publish(url string) error {
cons, run, err := GetConsumer(url)
if err != nil {
return err
}
if err = s.AddConsumer(cons); err != nil {
return err
}
go func() {
run()
s.RemoveConsumer(cons)
// TODO: more smart retry
time.Sleep(5 * time.Second)
_ = s.Publish(url)
}()
return nil
}
func Publish(stream *Stream, destination any) {
switch v := destination.(type) {
case string:
if err := stream.Publish(v); err != nil {
log.Error().Err(err).Caller().Send()
}
case []any:
for _, v := range v {
Publish(stream, v)
}
}
}
@@ -0,0 +1,130 @@
package streams
import (
"encoding/json"
"sync"
"sync/atomic"
"github.com/AlexxIT/go2rtc/pkg/core"
)
type Stream struct {
producers []*Producer
consumers []core.Consumer
mu sync.Mutex
pending atomic.Int32
}
func NewStream(source any) *Stream {
switch source := source.(type) {
case string:
return &Stream{
producers: []*Producer{NewProducer(source)},
}
case []string:
s := new(Stream)
for _, str := range source {
s.producers = append(s.producers, NewProducer(str))
}
return s
case []any:
s := new(Stream)
for _, src := range source {
str, ok := src.(string)
if !ok {
log.Error().Msgf("[stream] NewStream: Expected string, got %v", src)
continue
}
s.producers = append(s.producers, NewProducer(str))
}
return s
case map[string]any:
return NewStream(source["url"])
case nil:
return new(Stream)
default:
panic(core.Caller())
}
}
func (s *Stream) Sources() []string {
sources := make([]string, 0, len(s.producers))
for _, prod := range s.producers {
sources = append(sources, prod.url)
}
return sources
}
func (s *Stream) SetSource(source string) {
for _, prod := range s.producers {
prod.SetSource(source)
}
}
func (s *Stream) RemoveConsumer(cons core.Consumer) {
_ = cons.Stop()
s.mu.Lock()
for i, consumer := range s.consumers {
if consumer == cons {
s.consumers = append(s.consumers[:i], s.consumers[i+1:]...)
break
}
}
s.mu.Unlock()
s.stopProducers()
}
func (s *Stream) AddProducer(prod core.Producer) {
producer := &Producer{conn: prod, state: stateExternal, url: "external"}
s.mu.Lock()
s.producers = append(s.producers, producer)
s.mu.Unlock()
}
func (s *Stream) RemoveProducer(prod core.Producer) {
s.mu.Lock()
for i, producer := range s.producers {
if producer.conn == prod {
s.producers = append(s.producers[:i], s.producers[i+1:]...)
break
}
}
s.mu.Unlock()
}
func (s *Stream) stopProducers() {
if s.pending.Load() > 0 {
log.Trace().Msg("[streams] skip stop pending producer")
return
}
s.mu.Lock()
producers:
for _, producer := range s.producers {
for _, track := range producer.receivers {
if len(track.Senders()) > 0 {
continue producers
}
}
for _, track := range producer.senders {
if len(track.Senders()) > 0 {
continue producers
}
}
producer.stop()
}
s.mu.Unlock()
}
func (s *Stream) MarshalJSON() ([]byte, error) {
var info = struct {
Producers []*Producer `json:"producers"`
Consumers []core.Consumer `json:"consumers"`
}{
Producers: s.producers,
Consumers: s.consumers,
}
return json.Marshal(info)
}
@@ -0,0 +1,42 @@
package streams
import (
"net/url"
"testing"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/stretchr/testify/require"
)
func TestRecursion(t *testing.T) {
// create stream with some source
stream1, err := New("from_yaml", "does_not_matter")
require.NoError(t, err)
require.Len(t, streams, 1)
// ask another unnamed stream that links go2rtc
query, err := url.ParseQuery("src=rtsp://localhost:8554/from_yaml?video")
require.NoError(t, err)
stream2, err := GetOrPatch(query)
require.NoError(t, err)
// check stream is same
require.Equal(t, stream1, stream2)
// check stream urls is same
require.Equal(t, stream1.producers[0].url, stream2.producers[0].url)
require.Len(t, streams, 2)
}
func TestTempate(t *testing.T) {
HandleFunc("rtsp", func(url string) (core.Producer, error) { return nil, nil }) // bypass HasProducer
// config from yaml
stream1, err := New("camera.from_hass", "ffmpeg:{input}#video=copy")
require.NoError(t, err)
// request from hass
stream2, err := Patch("camera.from_hass", "rtsp://example.com")
require.NoError(t, err)
require.Equal(t, stream1, stream2)
require.Equal(t, "ffmpeg:rtsp://example.com#video=copy", stream1.producers[0].url)
}
@@ -0,0 +1,176 @@
package streams
import (
"errors"
"net/url"
"sync"
"time"
"github.com/AlexxIT/go2rtc/internal/api"
"github.com/AlexxIT/go2rtc/internal/app"
"github.com/rs/zerolog"
)
func Init() {
var cfg struct {
Streams map[string]any `yaml:"streams"`
Publish map[string]any `yaml:"publish"`
Preload map[string]string `yaml:"preload"`
}
app.LoadConfig(&cfg)
log = app.GetLogger("streams")
for name, item := range cfg.Streams {
streams[name] = NewStream(item)
}
api.HandleFunc("api/streams", apiStreams)
api.HandleFunc("api/streams.dot", apiStreamsDOT)
api.HandleFunc("api/preload", apiPreload)
api.HandleFunc("api/schemes", apiSchemes)
if cfg.Publish == nil && cfg.Preload == nil {
return
}
time.AfterFunc(time.Second, func() {
// range for nil map is OK
for name, dst := range cfg.Publish {
if stream := Get(name); stream != nil {
Publish(stream, dst)
}
}
for name, rawQuery := range cfg.Preload {
if err := AddPreload(name, rawQuery); err != nil {
log.Error().Err(err).Caller().Send()
}
}
})
}
func New(name string, sources ...string) (*Stream, error) {
for _, source := range sources {
if !HasProducer(source) {
return nil, errors.New("streams: source not supported")
}
if err := Validate(source); err != nil {
return nil, err
}
}
stream := NewStream(sources)
streamsMu.Lock()
streams[name] = stream
streamsMu.Unlock()
return stream, nil
}
func Patch(name string, source string) (*Stream, error) {
streamsMu.Lock()
defer streamsMu.Unlock()
// check if source links to some stream name from go2rtc
if u, err := url.Parse(source); err == nil && u.Scheme == "rtsp" && len(u.Path) > 1 {
rtspName := u.Path[1:]
if stream, ok := streams[rtspName]; ok {
if streams[name] != stream {
// link (alias) streams[name] to streams[rtspName]
streams[name] = stream
}
return stream, nil
}
}
if stream, ok := streams[source]; ok {
if name != source {
// link (alias) streams[name] to streams[source]
streams[name] = stream
}
return stream, nil
}
// check if src has supported scheme
if !HasProducer(source) {
return nil, errors.New("streams: source not supported")
}
if err := Validate(source); err != nil {
return nil, err
}
// check an existing stream with this name
if stream, ok := streams[name]; ok {
stream.SetSource(source)
return stream, nil
}
// create new stream with this name
stream := NewStream(source)
streams[name] = stream
return stream, nil
}
func GetOrPatch(query url.Values) (*Stream, error) {
// check if src param exists
source := query.Get("src")
if source == "" {
return nil, errors.New("streams: source empty")
}
// check if src is stream name
if stream := Get(source); stream != nil {
return stream, nil
}
// check if name param provided
if name := query.Get("name"); name != "" {
return Patch(name, source)
}
// return new stream with src as name
return Patch(source, source)
}
var log zerolog.Logger
// streams map
var streams = map[string]*Stream{}
var streamsMu sync.Mutex
func Get(name string) *Stream {
streamsMu.Lock()
defer streamsMu.Unlock()
return streams[name]
}
func Delete(name string) {
streamsMu.Lock()
defer streamsMu.Unlock()
delete(streams, name)
}
func GetAllNames() []string {
streamsMu.Lock()
names := make([]string, 0, len(streams))
for name := range streams {
names = append(names, name)
}
streamsMu.Unlock()
return names
}
func GetAllSources() map[string][]string {
streamsMu.Lock()
sources := make(map[string][]string, len(streams))
for name, stream := range streams {
sources[name] = stream.Sources()
}
streamsMu.Unlock()
return sources
}