2023-12-17 15:54:35 +02:00
|
|
|
// mautrix-signal - A Matrix-signal puppeting bridge.
|
|
|
|
|
// Copyright (C) 2023 Scott Weber
|
|
|
|
|
//
|
|
|
|
|
// This program is free software: you can redistribute it and/or modify
|
|
|
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
|
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
|
|
|
// (at your option) any later version.
|
|
|
|
|
//
|
|
|
|
|
// This program is distributed in the hope that it will be useful,
|
|
|
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
|
// GNU Affero General Public License for more details.
|
|
|
|
|
//
|
|
|
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
|
|
|
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
|
|
2023-05-31 16:39:09 -04:00
|
|
|
package web
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"encoding/base64"
|
2025-11-24 17:57:18 +02:00
|
|
|
"encoding/json"
|
2023-09-30 14:39:26 +03:00
|
|
|
"errors"
|
|
|
|
|
"fmt"
|
2023-05-31 16:39:09 -04:00
|
|
|
"net/http"
|
2025-01-15 23:40:50 +02:00
|
|
|
"net/url"
|
2023-09-06 00:47:08 -04:00
|
|
|
"strings"
|
2024-12-17 12:41:35 +02:00
|
|
|
"sync"
|
2025-04-29 23:41:49 +03:00
|
|
|
"sync/atomic"
|
2023-05-31 16:39:09 -04:00
|
|
|
"time"
|
|
|
|
|
|
2024-09-13 12:27:26 +03:00
|
|
|
"github.com/coder/websocket"
|
2023-12-26 19:08:02 -07:00
|
|
|
"github.com/rs/zerolog"
|
2025-03-25 12:52:13 +02:00
|
|
|
"go.mau.fi/util/exsync"
|
2023-09-30 14:39:26 +03:00
|
|
|
|
2023-05-31 16:39:09 -04:00
|
|
|
signalpb "go.mau.fi/mautrix-signal/pkg/signalmeow/protobuf"
|
|
|
|
|
"go.mau.fi/mautrix-signal/pkg/signalmeow/wspb"
|
|
|
|
|
)
|
|
|
|
|
|
2026-01-09 12:08:09 +00:00
|
|
|
var WebsocketPingInterval = 30 * time.Second
|
|
|
|
|
var WebsocketPingTimeout = 20 * time.Second
|
|
|
|
|
var WebsocketPingTimeoutLimit = 5
|
|
|
|
|
|
2023-05-31 16:39:09 -04:00
|
|
|
const WebsocketProvisioningPath = "/v1/websocket/provisioning/"
|
|
|
|
|
const WebsocketPath = "/v1/websocket/"
|
|
|
|
|
|
|
|
|
|
type SimpleResponse struct {
|
2025-05-12 18:37:19 +03:00
|
|
|
Status int
|
|
|
|
|
WriteCallback func(time.Time)
|
2023-05-31 16:39:09 -04:00
|
|
|
}
|
|
|
|
|
type RequestHandlerFunc func(context.Context, *signalpb.WebSocketRequestMessage) (*SimpleResponse, error)
|
|
|
|
|
|
|
|
|
|
type SignalWebsocket struct {
|
2025-04-29 23:41:49 +03:00
|
|
|
ws atomic.Pointer[websocket.Conn]
|
2025-01-15 23:40:50 +02:00
|
|
|
basicAuth *url.Userinfo
|
2023-08-22 11:54:35 -04:00
|
|
|
sendChannel chan SignalWebsocketSendMessage
|
|
|
|
|
statusChannel chan SignalWebsocketConnectionStatus
|
2025-03-25 12:52:13 +02:00
|
|
|
closeLock sync.RWMutex
|
|
|
|
|
closeEvt *exsync.Event
|
2025-04-29 23:41:49 +03:00
|
|
|
closeCalled atomic.Bool
|
|
|
|
|
cancel atomic.Pointer[context.CancelFunc]
|
2026-01-15 14:08:00 +02:00
|
|
|
cancelConn atomic.Pointer[context.CancelCauseFunc]
|
2023-05-31 16:39:09 -04:00
|
|
|
}
|
|
|
|
|
|
2025-01-15 23:40:50 +02:00
|
|
|
func NewSignalWebsocket(basicAuth *url.Userinfo) *SignalWebsocket {
|
2023-06-07 01:36:32 -04:00
|
|
|
return &SignalWebsocket{
|
2023-08-22 11:54:35 -04:00
|
|
|
basicAuth: basicAuth,
|
|
|
|
|
sendChannel: make(chan SignalWebsocketSendMessage),
|
|
|
|
|
statusChannel: make(chan SignalWebsocketConnectionStatus),
|
2025-03-25 12:52:13 +02:00
|
|
|
closeEvt: exsync.NewEvent(),
|
2023-06-07 01:36:32 -04:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2023-08-22 11:54:35 -04:00
|
|
|
type SignalWebsocketConnectionEvent int
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
SignalWebsocketConnectionEventConnecting SignalWebsocketConnectionEvent = iota // Implicit to catch default value (0), doesn't get sent
|
|
|
|
|
SignalWebsocketConnectionEventConnected
|
|
|
|
|
SignalWebsocketConnectionEventDisconnected
|
2023-08-24 15:06:39 -04:00
|
|
|
SignalWebsocketConnectionEventLoggedOut
|
2023-08-22 11:54:35 -04:00
|
|
|
SignalWebsocketConnectionEventError
|
2025-10-20 11:52:14 +03:00
|
|
|
SignalWebsocketConnectionEventFatalError
|
2023-09-27 14:25:02 -04:00
|
|
|
SignalWebsocketConnectionEventCleanShutdown
|
2023-08-22 11:54:35 -04:00
|
|
|
)
|
|
|
|
|
|
2023-11-03 16:55:02 -04:00
|
|
|
// mapping from SignalWebsocketConnectionEvent to its string representation
|
|
|
|
|
var signalWebsocketConnectionEventNames = map[SignalWebsocketConnectionEvent]string{
|
|
|
|
|
SignalWebsocketConnectionEventConnecting: "SignalWebsocketConnectionEventConnecting",
|
|
|
|
|
SignalWebsocketConnectionEventConnected: "SignalWebsocketConnectionEventConnected",
|
|
|
|
|
SignalWebsocketConnectionEventDisconnected: "SignalWebsocketConnectionEventDisconnected",
|
|
|
|
|
SignalWebsocketConnectionEventLoggedOut: "SignalWebsocketConnectionEventLoggedOut",
|
|
|
|
|
SignalWebsocketConnectionEventError: "SignalWebsocketConnectionEventError",
|
2025-10-20 11:52:14 +03:00
|
|
|
SignalWebsocketConnectionEventFatalError: "SignalWebsocketConnectionEventFatalError",
|
2023-11-03 16:55:02 -04:00
|
|
|
SignalWebsocketConnectionEventCleanShutdown: "SignalWebsocketConnectionEventCleanShutdown",
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Implement the fmt.Stringer interface
|
|
|
|
|
func (s SignalWebsocketConnectionEvent) String() string {
|
|
|
|
|
return signalWebsocketConnectionEventNames[s]
|
|
|
|
|
}
|
|
|
|
|
|
2023-08-22 11:54:35 -04:00
|
|
|
type SignalWebsocketConnectionStatus struct {
|
|
|
|
|
Event SignalWebsocketConnectionEvent
|
|
|
|
|
Err error
|
2023-06-07 01:36:32 -04:00
|
|
|
}
|
|
|
|
|
|
2023-12-30 20:45:12 +01:00
|
|
|
func (s *SignalWebsocket) IsConnected() bool {
|
2025-04-29 23:41:49 +03:00
|
|
|
return s.ws.Load() != nil
|
2023-12-30 20:45:12 +01:00
|
|
|
}
|
|
|
|
|
|
2025-04-29 14:12:08 +03:00
|
|
|
func (s *SignalWebsocket) Close() (err error) {
|
|
|
|
|
if s == nil {
|
|
|
|
|
return nil
|
2023-05-31 16:39:09 -04:00
|
|
|
}
|
2025-04-29 14:12:08 +03:00
|
|
|
|
2025-04-29 23:41:49 +03:00
|
|
|
s.closeCalled.Store(true)
|
|
|
|
|
if ws := s.ws.Swap(nil); ws != nil {
|
|
|
|
|
err = ws.Close(websocket.StatusNormalClosure, "")
|
2025-04-29 14:12:08 +03:00
|
|
|
}
|
2025-04-29 23:41:49 +03:00
|
|
|
if cancelLoop := s.cancel.Swap(nil); cancelLoop != nil {
|
|
|
|
|
(*cancelLoop)()
|
2025-04-29 14:12:08 +03:00
|
|
|
}
|
|
|
|
|
<-s.closeEvt.GetChan()
|
|
|
|
|
return err
|
2023-06-07 01:36:32 -04:00
|
|
|
}
|
|
|
|
|
|
2025-03-13 14:49:40 +02:00
|
|
|
func (s *SignalWebsocket) Connect(ctx context.Context, requestHandler RequestHandlerFunc) chan SignalWebsocketConnectionStatus {
|
2023-06-07 01:36:32 -04:00
|
|
|
go s.connectLoop(ctx, requestHandler)
|
2023-08-22 11:54:35 -04:00
|
|
|
return s.statusChannel
|
2023-06-07 01:36:32 -04:00
|
|
|
}
|
|
|
|
|
|
2025-03-13 17:43:11 +02:00
|
|
|
func (s *SignalWebsocket) pushStatus(ctx context.Context, status SignalWebsocketConnectionEvent, err error) {
|
|
|
|
|
select {
|
|
|
|
|
case s.statusChannel <- SignalWebsocketConnectionStatus{
|
|
|
|
|
Event: status,
|
|
|
|
|
Err: err,
|
|
|
|
|
}:
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
return
|
|
|
|
|
case <-time.After(5 * time.Second):
|
|
|
|
|
zerolog.Ctx(ctx).Error().Msg("Status channel didn't accept status")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2025-03-25 12:52:13 +02:00
|
|
|
func (s *SignalWebsocket) pushOutgoing(ctx context.Context, send SignalWebsocketSendMessage) error {
|
2025-07-08 19:12:33 +03:00
|
|
|
if ctx.Err() != nil {
|
|
|
|
|
return ctx.Err()
|
|
|
|
|
}
|
2025-03-25 12:52:13 +02:00
|
|
|
s.closeLock.RLock()
|
|
|
|
|
defer s.closeLock.RUnlock()
|
|
|
|
|
if s.sendChannel == nil {
|
|
|
|
|
return errors.New("connection is not open")
|
|
|
|
|
}
|
|
|
|
|
select {
|
|
|
|
|
case s.sendChannel <- send:
|
|
|
|
|
return nil
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
return ctx.Err()
|
|
|
|
|
case <-s.closeEvt.GetChan():
|
|
|
|
|
return errors.New("connection closed before send could be queued")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-01-15 14:08:00 +02:00
|
|
|
var ErrForcedReconnect = errors.New("forced reconnect")
|
|
|
|
|
|
|
|
|
|
func (s *SignalWebsocket) ForceReconnect() {
|
|
|
|
|
if s == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
cancelFn := s.cancelConn.Load()
|
|
|
|
|
if cancelFn == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
(*cancelFn)(ErrForcedReconnect)
|
|
|
|
|
}
|
|
|
|
|
|
2023-06-07 01:36:32 -04:00
|
|
|
func (s *SignalWebsocket) connectLoop(
|
|
|
|
|
ctx context.Context,
|
2025-03-13 14:49:40 +02:00
|
|
|
requestHandler RequestHandlerFunc,
|
2023-06-07 01:36:32 -04:00
|
|
|
) {
|
2023-12-26 19:08:02 -07:00
|
|
|
log := zerolog.Ctx(ctx).With().
|
|
|
|
|
Str("loop", "signal_websocket_connect_loop").
|
|
|
|
|
Logger()
|
2025-04-29 23:41:49 +03:00
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
|
s.cancel.Store(&cancel)
|
2023-11-20 22:28:49 -05:00
|
|
|
|
2025-03-13 16:41:39 +02:00
|
|
|
incomingRequestChan := make(chan *signalpb.WebSocketRequestMessage, 256)
|
2023-11-20 22:28:49 -05:00
|
|
|
defer func() {
|
2025-03-25 12:52:13 +02:00
|
|
|
s.closeEvt.Set()
|
2025-04-29 23:41:49 +03:00
|
|
|
cancel()
|
2025-03-25 12:52:13 +02:00
|
|
|
|
|
|
|
|
s.closeLock.Lock()
|
|
|
|
|
defer s.closeLock.Unlock()
|
2023-11-20 22:28:49 -05:00
|
|
|
close(incomingRequestChan)
|
|
|
|
|
close(s.statusChannel)
|
|
|
|
|
close(s.sendChannel)
|
|
|
|
|
incomingRequestChan = nil
|
|
|
|
|
s.statusChannel = nil
|
|
|
|
|
s.sendChannel = nil
|
|
|
|
|
}()
|
2023-06-07 01:36:32 -04:00
|
|
|
|
2025-05-07 15:01:13 +01:00
|
|
|
const initialBackoff = 10 * time.Second
|
2023-06-07 01:36:32 -04:00
|
|
|
const backoffIncrement = 5 * time.Second
|
|
|
|
|
const maxBackoff = 60 * time.Second
|
|
|
|
|
|
2025-04-29 23:41:49 +03:00
|
|
|
if s.ws.Load() != nil {
|
2023-06-07 01:36:32 -04:00
|
|
|
panic("Already connected")
|
|
|
|
|
}
|
|
|
|
|
|
2023-08-22 11:54:35 -04:00
|
|
|
// First set up request handler loop. This exists outside of the
|
|
|
|
|
// connection loops because we want to maintain it across reconnections
|
2023-05-31 16:39:09 -04:00
|
|
|
go func() {
|
|
|
|
|
for {
|
2023-06-07 01:36:32 -04:00
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
2023-12-26 19:08:02 -07:00
|
|
|
log.Info().Msg("ctx done, stopping request loop")
|
2023-06-07 01:36:32 -04:00
|
|
|
return
|
Fix deadlock in websocket code
- Readloop goroutine gets a new request (incoming message), sends to
incoming loop
- Incoming request loop goroutine calls incoming message handler
- Incoming message handler needs group info, sends a WS request
- Sendloop goroutine gets request, sends on WS, stores response channel
in map
- Incoming message handler is blocked waiting on response on it's
response channel
- Readloop gets some other request, tries to send to incoming loop, but
gets blocked on sending to incoming request channel because it's
unbuffered and that goroutine is still blocked waiting on group info
- Server responds with response to group info request, but readloop is
blocked now forever
The fix: give the incomingRequestChan a large (10000) buffer. Now:
- Readloop goroutine gets a new request (incoming message), sends to
incoming loop
- Incoming request loop goroutine calls incoming message handler
- Incoming message handler needs group info, sends a WS request
- Sendloop goroutine gets request, sends on WS, stores response channel
in map
- Incoming message handler is blocked waiting on response on it's
response channel
- Readloop gets some other request and successfully buffers it in the
incoming request channel
- Server responds with response to group info request, readloop is *not*
blocked, and can pass the response down the response channel that the
incoming message handler is blocked on, and it can finish up
- Incoming request loop is free to continue processing incoming requests
2023-08-18 12:41:21 -04:00
|
|
|
case request, ok := <-incomingRequestChan:
|
2023-06-07 01:36:32 -04:00
|
|
|
if !ok {
|
2023-08-24 15:06:39 -04:00
|
|
|
// Main connection loop must have closed, so we should stop
|
2023-12-26 19:08:02 -07:00
|
|
|
log.Info().Msg("incomingRequestChan closed, stopping request loop")
|
2023-08-24 15:06:39 -04:00
|
|
|
return
|
2023-06-07 01:36:32 -04:00
|
|
|
}
|
|
|
|
|
if request == nil {
|
2023-12-26 19:08:02 -07:00
|
|
|
log.Fatal().Msg("Received nil request")
|
2023-06-07 01:36:32 -04:00
|
|
|
}
|
2023-06-26 10:43:33 -04:00
|
|
|
if requestHandler == nil {
|
2023-12-26 19:08:02 -07:00
|
|
|
log.Fatal().Msg("Received request but no handler")
|
2023-06-26 10:43:33 -04:00
|
|
|
}
|
Fix deadlock in websocket code
- Readloop goroutine gets a new request (incoming message), sends to
incoming loop
- Incoming request loop goroutine calls incoming message handler
- Incoming message handler needs group info, sends a WS request
- Sendloop goroutine gets request, sends on WS, stores response channel
in map
- Incoming message handler is blocked waiting on response on it's
response channel
- Readloop gets some other request, tries to send to incoming loop, but
gets blocked on sending to incoming request channel because it's
unbuffered and that goroutine is still blocked waiting on group info
- Server responds with response to group info request, but readloop is
blocked now forever
The fix: give the incomingRequestChan a large (10000) buffer. Now:
- Readloop goroutine gets a new request (incoming message), sends to
incoming loop
- Incoming request loop goroutine calls incoming message handler
- Incoming message handler needs group info, sends a WS request
- Sendloop goroutine gets request, sends on WS, stores response channel
in map
- Incoming message handler is blocked waiting on response on it's
response channel
- Readloop gets some other request and successfully buffers it in the
incoming request channel
- Server responds with response to group info request, readloop is *not*
blocked, and can pass the response down the response channel that the
incoming message handler is blocked on, and it can finish up
- Incoming request loop is free to continue processing incoming requests
2023-08-18 12:41:21 -04:00
|
|
|
|
|
|
|
|
// Handle the request with the request handler function
|
2025-03-13 14:49:40 +02:00
|
|
|
response, err := requestHandler(ctx, request)
|
Fix deadlock in websocket code
- Readloop goroutine gets a new request (incoming message), sends to
incoming loop
- Incoming request loop goroutine calls incoming message handler
- Incoming message handler needs group info, sends a WS request
- Sendloop goroutine gets request, sends on WS, stores response channel
in map
- Incoming message handler is blocked waiting on response on it's
response channel
- Readloop gets some other request, tries to send to incoming loop, but
gets blocked on sending to incoming request channel because it's
unbuffered and that goroutine is still blocked waiting on group info
- Server responds with response to group info request, but readloop is
blocked now forever
The fix: give the incomingRequestChan a large (10000) buffer. Now:
- Readloop goroutine gets a new request (incoming message), sends to
incoming loop
- Incoming request loop goroutine calls incoming message handler
- Incoming message handler needs group info, sends a WS request
- Sendloop goroutine gets request, sends on WS, stores response channel
in map
- Incoming message handler is blocked waiting on response on it's
response channel
- Readloop gets some other request and successfully buffers it in the
incoming request channel
- Server responds with response to group info request, readloop is *not*
blocked, and can pass the response down the response channel that the
incoming message handler is blocked on, and it can finish up
- Incoming request loop is free to continue processing incoming requests
2023-08-18 12:41:21 -04:00
|
|
|
|
2023-05-31 16:39:09 -04:00
|
|
|
if err != nil {
|
2025-03-13 14:48:09 +02:00
|
|
|
log.Err(err).Uint64("request_id", request.GetId()).Msg("Error handling request")
|
2025-03-25 12:52:13 +02:00
|
|
|
} else if response != nil {
|
|
|
|
|
err = s.pushOutgoing(ctx, SignalWebsocketSendMessage{
|
2023-06-07 01:36:32 -04:00
|
|
|
RequestMessage: request,
|
|
|
|
|
ResponseMessage: response,
|
2025-03-25 12:52:13 +02:00
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Err(err).Uint64("request_id", request.GetId()).Msg("Error queuing response message")
|
2023-06-07 01:36:32 -04:00
|
|
|
}
|
2025-03-13 14:48:09 +02:00
|
|
|
} else {
|
2025-03-25 12:52:13 +02:00
|
|
|
log.Warn().Uint64("request_id", request.GetId()).Msg("Request handler didn't return a response nor an error")
|
2023-05-31 16:39:09 -04:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
2023-06-07 01:36:32 -04:00
|
|
|
// Main connection loop - if there's a problem with anything just
|
|
|
|
|
// kill everything (including the websocket) and build it all up again
|
2024-12-17 12:39:43 +02:00
|
|
|
backoff := initialBackoff
|
2023-08-24 15:06:39 -04:00
|
|
|
retrying := false
|
2023-11-22 23:06:41 -05:00
|
|
|
errorCount := 0
|
2024-12-17 12:48:56 +02:00
|
|
|
isFirstConnect := true
|
2025-01-15 23:40:50 +02:00
|
|
|
wsURL := (&url.URL{
|
|
|
|
|
Scheme: "wss",
|
|
|
|
|
Host: APIHostname,
|
|
|
|
|
Path: WebsocketPath,
|
|
|
|
|
User: s.basicAuth,
|
|
|
|
|
}).String()
|
2023-06-07 01:36:32 -04:00
|
|
|
for {
|
|
|
|
|
if retrying {
|
|
|
|
|
if backoff > maxBackoff {
|
|
|
|
|
backoff = maxBackoff
|
|
|
|
|
}
|
2023-12-26 19:08:02 -07:00
|
|
|
log.Warn().Dur("backoff", backoff).Msg("Failed to connect, waiting to retry...")
|
2025-05-07 15:01:34 +01:00
|
|
|
select {
|
|
|
|
|
case <-time.After(backoff):
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
}
|
2023-06-07 01:36:32 -04:00
|
|
|
backoff += backoffIncrement
|
2025-05-07 15:01:34 +01:00
|
|
|
} else if !isFirstConnect && s.basicAuth != nil {
|
|
|
|
|
select {
|
|
|
|
|
case <-time.After(initialBackoff):
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
}
|
2023-06-07 01:36:32 -04:00
|
|
|
}
|
2023-08-24 15:06:39 -04:00
|
|
|
if ctx.Err() != nil {
|
2023-12-26 19:08:02 -07:00
|
|
|
log.Info().Msg("ctx done, stopping connection loop")
|
2023-08-24 15:06:39 -04:00
|
|
|
return
|
|
|
|
|
}
|
2024-12-17 12:48:56 +02:00
|
|
|
isFirstConnect = false
|
2023-06-07 01:36:32 -04:00
|
|
|
|
2025-01-15 23:40:50 +02:00
|
|
|
ws, resp, err := OpenWebsocket(ctx, wsURL)
|
2023-08-24 15:06:39 -04:00
|
|
|
if resp != nil {
|
|
|
|
|
if resp.StatusCode != 101 {
|
|
|
|
|
// Server didn't want to open websocket
|
2023-08-22 11:54:35 -04:00
|
|
|
if resp.StatusCode >= 500 {
|
|
|
|
|
// We can try again if it's a 5xx
|
2025-03-13 17:43:11 +02:00
|
|
|
s.pushStatus(ctx, SignalWebsocketConnectionEventDisconnected, fmt.Errorf("5xx opening websocket: %v", resp.Status))
|
2023-08-24 15:06:39 -04:00
|
|
|
} else if resp.StatusCode == 403 {
|
|
|
|
|
// We are logged out, so we should stop trying to reconnect
|
2025-03-13 17:43:11 +02:00
|
|
|
s.pushStatus(ctx, SignalWebsocketConnectionEventLoggedOut, fmt.Errorf("403 opening websocket, we are logged out"))
|
2023-08-24 15:06:39 -04:00
|
|
|
return // NOT RETRYING, KILLING THE CONNECTION LOOP
|
|
|
|
|
} else if resp.StatusCode > 0 && resp.StatusCode < 500 {
|
|
|
|
|
// Unexpected status code
|
2025-10-20 11:52:14 +03:00
|
|
|
s.pushStatus(ctx, SignalWebsocketConnectionEventFatalError, fmt.Errorf("unexpected status opening websocket: %v", resp.Status))
|
2023-08-24 15:06:39 -04:00
|
|
|
return // NOT RETRYING, KILLING THE CONNECTION LOOP
|
2023-08-22 11:54:35 -04:00
|
|
|
} else {
|
|
|
|
|
// Something is very wrong
|
2025-03-13 17:43:11 +02:00
|
|
|
s.pushStatus(ctx, SignalWebsocketConnectionEventError, fmt.Errorf("unexpected error opening websocket: %v", resp.Status))
|
2023-08-24 15:06:39 -04:00
|
|
|
}
|
|
|
|
|
// Retry the connection
|
|
|
|
|
retrying = true
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
// Unexpected error opening websocket
|
|
|
|
|
if backoff < maxBackoff {
|
2025-03-13 17:43:11 +02:00
|
|
|
s.pushStatus(ctx, SignalWebsocketConnectionEventDisconnected, fmt.Errorf("transient error opening websocket: %w", err))
|
2023-08-24 15:06:39 -04:00
|
|
|
} else {
|
2025-03-13 17:43:11 +02:00
|
|
|
s.pushStatus(ctx, SignalWebsocketConnectionEventError, fmt.Errorf("continuing error opening websocket: %w", err))
|
2023-06-07 01:36:32 -04:00
|
|
|
}
|
|
|
|
|
retrying = true
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Succssfully connected
|
2025-03-13 17:43:11 +02:00
|
|
|
s.pushStatus(ctx, SignalWebsocketConnectionEventConnected, nil)
|
2025-04-29 23:41:49 +03:00
|
|
|
s.ws.Store(ws)
|
2023-06-07 01:36:32 -04:00
|
|
|
retrying = false
|
2024-12-17 12:39:43 +02:00
|
|
|
backoff = initialBackoff
|
2023-06-07 01:36:32 -04:00
|
|
|
|
2025-11-24 15:20:48 +00:00
|
|
|
responseChannels := exsync.NewMap[uint64, chan *signalpb.WebSocketResponseMessage]()
|
2023-06-07 01:36:32 -04:00
|
|
|
loopCtx, loopCancel := context.WithCancelCause(ctx)
|
2026-01-15 14:08:00 +02:00
|
|
|
s.cancelConn.Store(&loopCancel)
|
2024-12-17 12:41:35 +02:00
|
|
|
var wg sync.WaitGroup
|
|
|
|
|
wg.Add(3)
|
2023-06-07 01:36:32 -04:00
|
|
|
|
|
|
|
|
// Read loop (for reading incoming reqeusts and responses to outgoing requests)
|
|
|
|
|
go func() {
|
2024-12-17 12:41:35 +02:00
|
|
|
defer wg.Done()
|
2024-12-17 12:39:43 +02:00
|
|
|
err := readLoop(loopCtx, ws, incomingRequestChan, responseChannels)
|
2023-10-02 00:30:09 -04:00
|
|
|
// Don't want to put an err into loopCancel if we don't have one
|
|
|
|
|
if err != nil {
|
|
|
|
|
err = fmt.Errorf("error in readLoop: %w", err)
|
|
|
|
|
}
|
2025-04-29 23:41:49 +03:00
|
|
|
if s.closeCalled.Load() {
|
2025-04-29 14:12:08 +03:00
|
|
|
// Exit during Close() so cancel the reconnect loop as well
|
2025-04-29 23:41:49 +03:00
|
|
|
cancel()
|
2025-04-29 14:12:08 +03:00
|
|
|
}
|
2023-10-02 00:30:09 -04:00
|
|
|
loopCancel(err)
|
2023-12-26 19:08:02 -07:00
|
|
|
log.Info().Msg("readLoop exited")
|
2023-06-07 01:36:32 -04:00
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
// Write loop (for sending outgoing requests and responses to incoming requests)
|
|
|
|
|
go func() {
|
2024-12-17 12:41:35 +02:00
|
|
|
defer wg.Done()
|
2024-12-17 12:39:43 +02:00
|
|
|
err := writeLoop(loopCtx, ws, s.sendChannel, responseChannels)
|
2023-10-02 00:30:09 -04:00
|
|
|
// Don't want to put an err into loopCancel if we don't have one
|
|
|
|
|
if err != nil {
|
|
|
|
|
err = fmt.Errorf("error in writeLoop: %w", err)
|
|
|
|
|
}
|
|
|
|
|
loopCancel(err)
|
2023-12-26 19:08:02 -07:00
|
|
|
log.Info().Msg("writeLoop exited")
|
2023-06-07 01:36:32 -04:00
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
// Ping loop (send a keepalive Ping every 30s)
|
|
|
|
|
go func() {
|
2024-12-17 12:41:35 +02:00
|
|
|
defer wg.Done()
|
2026-01-09 12:08:09 +00:00
|
|
|
ticker := time.NewTicker(WebsocketPingInterval)
|
2023-06-07 01:36:32 -04:00
|
|
|
defer ticker.Stop()
|
|
|
|
|
|
2025-01-07 19:08:27 +02:00
|
|
|
pingTimeoutCount := 0
|
2023-06-07 01:36:32 -04:00
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-ticker.C:
|
2026-01-09 12:08:09 +00:00
|
|
|
pingCtx, cancel := context.WithTimeout(loopCtx, WebsocketPingTimeout)
|
2024-05-01 09:17:15 +01:00
|
|
|
err := ws.Ping(pingCtx)
|
|
|
|
|
cancel()
|
2023-06-07 01:36:32 -04:00
|
|
|
if err != nil {
|
2025-01-07 19:08:27 +02:00
|
|
|
pingTimeoutCount++
|
2025-01-10 17:20:36 +02:00
|
|
|
log.Err(err).Msg("Failed to send ping")
|
2026-01-09 12:08:09 +00:00
|
|
|
if pingTimeoutCount >= WebsocketPingTimeoutLimit {
|
2025-01-07 19:08:27 +02:00
|
|
|
log.Warn().Msg("Ping timeout count exceeded, closing websocket")
|
|
|
|
|
err = ws.Close(websocket.StatusNormalClosure, "Ping timeout")
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Err(err).Msg("Error closing websocket after ping timeout")
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|
2025-01-10 17:20:36 +02:00
|
|
|
} else if pingTimeoutCount > 0 {
|
2025-01-07 19:08:27 +02:00
|
|
|
pingTimeoutCount = 0
|
2025-01-10 17:20:36 +02:00
|
|
|
log.Debug().Msg("Recovered from ping error")
|
|
|
|
|
} else {
|
2025-01-07 19:08:27 +02:00
|
|
|
log.Trace().Msg("Sent keepalive")
|
2023-06-07 01:36:32 -04:00
|
|
|
}
|
|
|
|
|
case <-loopCtx.Done():
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
2023-08-22 11:54:35 -04:00
|
|
|
// Wait for read or write or ping loop to exit (which means there was an error)
|
2025-02-26 16:47:51 +02:00
|
|
|
log.Debug().Msg("Finished preparing connection, waiting for loop context to finish")
|
|
|
|
|
<-loopCtx.Done()
|
|
|
|
|
ctxCauseErr := context.Cause(loopCtx)
|
|
|
|
|
log.Debug().AnErr("ctx_cause_err", ctxCauseErr).Msg("Read or write loop exited")
|
|
|
|
|
if ctxCauseErr == nil || errors.Is(ctxCauseErr, context.Canceled) {
|
2025-03-13 17:43:11 +02:00
|
|
|
s.pushStatus(ctx, SignalWebsocketConnectionEventCleanShutdown, nil)
|
2025-02-26 16:47:51 +02:00
|
|
|
} else {
|
|
|
|
|
errorCount++
|
2025-03-20 17:37:53 +02:00
|
|
|
s.pushStatus(ctx, SignalWebsocketConnectionEventDisconnected, ctxCauseErr)
|
2026-01-15 14:08:00 +02:00
|
|
|
if errors.Is(ctxCauseErr, ErrForcedReconnect) {
|
|
|
|
|
// Skip the delay for forced reconnects
|
|
|
|
|
// TODO should the delay be lowered globally?
|
|
|
|
|
isFirstConnect = true
|
|
|
|
|
}
|
2023-06-07 01:36:32 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Clean up
|
2024-12-17 12:39:43 +02:00
|
|
|
ws.Close(websocket.StatusGoingAway, "Going away")
|
2025-11-24 15:20:48 +00:00
|
|
|
for _, responseChannel := range responseChannels.SwapData(nil) {
|
2023-06-07 01:36:32 -04:00
|
|
|
close(responseChannel)
|
|
|
|
|
}
|
|
|
|
|
loopCancel(nil)
|
2024-12-17 12:41:35 +02:00
|
|
|
wg.Wait()
|
2023-12-26 19:08:02 -07:00
|
|
|
log.Debug().Msg("Finished websocket cleanup")
|
2023-11-22 23:06:41 -05:00
|
|
|
if errorCount > 500 {
|
|
|
|
|
// Something is really wrong, we better panic.
|
|
|
|
|
// This is a last defense against a runaway error loop,
|
|
|
|
|
// like the WS continually closing and reconnecting
|
2023-12-26 19:08:02 -07:00
|
|
|
log.Fatal().Int("error_count", errorCount).Msg("Too many errors, panicking")
|
2023-11-22 23:06:41 -05:00
|
|
|
}
|
2023-06-07 01:36:32 -04:00
|
|
|
}
|
2023-05-31 16:39:09 -04:00
|
|
|
}
|
|
|
|
|
|
2023-06-07 01:36:32 -04:00
|
|
|
func readLoop(
|
|
|
|
|
ctx context.Context,
|
|
|
|
|
ws *websocket.Conn,
|
Fix deadlock in websocket code
- Readloop goroutine gets a new request (incoming message), sends to
incoming loop
- Incoming request loop goroutine calls incoming message handler
- Incoming message handler needs group info, sends a WS request
- Sendloop goroutine gets request, sends on WS, stores response channel
in map
- Incoming message handler is blocked waiting on response on it's
response channel
- Readloop gets some other request, tries to send to incoming loop, but
gets blocked on sending to incoming request channel because it's
unbuffered and that goroutine is still blocked waiting on group info
- Server responds with response to group info request, but readloop is
blocked now forever
The fix: give the incomingRequestChan a large (10000) buffer. Now:
- Readloop goroutine gets a new request (incoming message), sends to
incoming loop
- Incoming request loop goroutine calls incoming message handler
- Incoming message handler needs group info, sends a WS request
- Sendloop goroutine gets request, sends on WS, stores response channel
in map
- Incoming message handler is blocked waiting on response on it's
response channel
- Readloop gets some other request and successfully buffers it in the
incoming request channel
- Server responds with response to group info request, readloop is *not*
blocked, and can pass the response down the response channel that the
incoming message handler is blocked on, and it can finish up
- Incoming request loop is free to continue processing incoming requests
2023-08-18 12:41:21 -04:00
|
|
|
incomingRequestChan chan *signalpb.WebSocketRequestMessage,
|
2025-11-24 15:20:48 +00:00
|
|
|
responseChannels *exsync.Map[uint64, chan *signalpb.WebSocketResponseMessage],
|
2023-06-07 01:36:32 -04:00
|
|
|
) error {
|
2023-12-26 19:08:02 -07:00
|
|
|
log := zerolog.Ctx(ctx).With().
|
|
|
|
|
Str("loop", "signal_websocket_read_loop").
|
|
|
|
|
Logger()
|
2023-06-07 01:36:32 -04:00
|
|
|
for {
|
|
|
|
|
if ctx.Err() != nil {
|
|
|
|
|
return ctx.Err()
|
|
|
|
|
}
|
|
|
|
|
msg := &signalpb.WebSocketMessage{}
|
|
|
|
|
//ctx, _ := context.WithTimeout(ctx, 10*time.Second) // For testing
|
|
|
|
|
err := wspb.Read(ctx, ws, msg)
|
2023-05-31 16:39:09 -04:00
|
|
|
if err != nil {
|
2023-09-27 14:25:02 -04:00
|
|
|
if err == context.Canceled {
|
2023-12-26 19:08:02 -07:00
|
|
|
log.Info().Msg("readLoop context canceled")
|
2024-12-17 12:39:43 +02:00
|
|
|
} else if websocket.CloseStatus(err) == websocket.StatusNormalClosure {
|
2023-12-26 19:08:02 -07:00
|
|
|
log.Info().Msg("readLoop received StatusNormalClosure")
|
2023-09-27 14:25:02 -04:00
|
|
|
return nil
|
|
|
|
|
}
|
2023-09-30 14:39:26 +03:00
|
|
|
return fmt.Errorf("error reading message: %w", err)
|
2023-05-31 16:39:09 -04:00
|
|
|
}
|
2023-06-07 01:36:32 -04:00
|
|
|
if msg.Type == nil {
|
2024-01-06 10:44:36 -07:00
|
|
|
return errors.New("received message with no type")
|
2023-06-07 01:36:32 -04:00
|
|
|
} else if *msg.Type == signalpb.WebSocketMessage_REQUEST {
|
|
|
|
|
if msg.Request == nil {
|
2024-01-06 10:44:36 -07:00
|
|
|
return errors.New("received request message with no request")
|
2023-06-07 01:36:32 -04:00
|
|
|
}
|
2025-06-09 21:13:54 +03:00
|
|
|
log.Trace().
|
2023-12-26 19:08:02 -07:00
|
|
|
Uint64("request_id", *msg.Request.Id).
|
|
|
|
|
Str("request_verb", *msg.Request.Verb).
|
|
|
|
|
Str("request_path", *msg.Request.Path).
|
|
|
|
|
Msg("Received WS request")
|
Fix deadlock in websocket code
- Readloop goroutine gets a new request (incoming message), sends to
incoming loop
- Incoming request loop goroutine calls incoming message handler
- Incoming message handler needs group info, sends a WS request
- Sendloop goroutine gets request, sends on WS, stores response channel
in map
- Incoming message handler is blocked waiting on response on it's
response channel
- Readloop gets some other request, tries to send to incoming loop, but
gets blocked on sending to incoming request channel because it's
unbuffered and that goroutine is still blocked waiting on group info
- Server responds with response to group info request, but readloop is
blocked now forever
The fix: give the incomingRequestChan a large (10000) buffer. Now:
- Readloop goroutine gets a new request (incoming message), sends to
incoming loop
- Incoming request loop goroutine calls incoming message handler
- Incoming message handler needs group info, sends a WS request
- Sendloop goroutine gets request, sends on WS, stores response channel
in map
- Incoming message handler is blocked waiting on response on it's
response channel
- Readloop gets some other request and successfully buffers it in the
incoming request channel
- Server responds with response to group info request, readloop is *not*
blocked, and can pass the response down the response channel that the
incoming message handler is blocked on, and it can finish up
- Incoming request loop is free to continue processing incoming requests
2023-08-18 12:41:21 -04:00
|
|
|
incomingRequestChan <- msg.Request
|
2023-06-07 01:36:32 -04:00
|
|
|
} else if *msg.Type == signalpb.WebSocketMessage_RESPONSE {
|
|
|
|
|
if msg.Response == nil {
|
2023-12-26 19:08:02 -07:00
|
|
|
log.Fatal().Msg("Received response with no response")
|
2023-06-07 01:36:32 -04:00
|
|
|
}
|
|
|
|
|
if msg.Response.Id == nil {
|
2023-12-26 19:08:02 -07:00
|
|
|
log.Fatal().Msg("Received response with no id")
|
2023-06-07 01:36:32 -04:00
|
|
|
}
|
2025-11-24 15:20:48 +00:00
|
|
|
responseChannel, ok := responseChannels.Pop(*msg.Response.Id)
|
2023-06-07 01:36:32 -04:00
|
|
|
if !ok {
|
2023-12-26 19:08:02 -07:00
|
|
|
log.Warn().
|
|
|
|
|
Uint64("response_id", *msg.Response.Id).
|
|
|
|
|
Msg("Received response with unknown id")
|
2023-06-07 01:36:32 -04:00
|
|
|
continue
|
|
|
|
|
}
|
2025-11-24 17:57:18 +02:00
|
|
|
logEvt := log.Debug().
|
|
|
|
|
Uint64("response_id", msg.Response.GetId()).
|
|
|
|
|
Uint32("response_status", msg.Response.GetStatus()).
|
|
|
|
|
Str("response_message", msg.Response.GetMessage())
|
2025-11-27 17:15:26 +02:00
|
|
|
if log.GetLevel() == zerolog.TraceLevel || len(msg.Response.Body) < 256 {
|
2025-11-24 17:57:18 +02:00
|
|
|
logEvt.Strs("response_headers", msg.Response.Headers)
|
|
|
|
|
if json.Valid(msg.Response.Body) {
|
|
|
|
|
logEvt.RawJSON("response_body", msg.Response.Body)
|
|
|
|
|
} else {
|
|
|
|
|
logEvt.Str("response_body", base64.StdEncoding.EncodeToString(msg.Response.Body))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
logEvt.Msg("Received WS response")
|
2023-06-07 01:36:32 -04:00
|
|
|
responseChannel <- msg.Response
|
|
|
|
|
close(responseChannel)
|
|
|
|
|
} else if *msg.Type == signalpb.WebSocketMessage_UNKNOWN {
|
2024-01-06 10:44:36 -07:00
|
|
|
return fmt.Errorf("received message with unknown type: %v", *msg.Type)
|
2023-06-07 01:36:32 -04:00
|
|
|
} else {
|
2024-01-06 10:44:36 -07:00
|
|
|
return fmt.Errorf("received message with actually unknown type: %v", *msg.Type)
|
2023-05-31 16:39:09 -04:00
|
|
|
}
|
|
|
|
|
}
|
2023-06-07 01:36:32 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type SignalWebsocketSendMessage struct {
|
|
|
|
|
// Populate if we're sending a request:
|
2023-09-06 00:47:08 -04:00
|
|
|
RequestTime time.Time
|
2023-06-07 01:36:32 -04:00
|
|
|
ResponseChannel chan *signalpb.WebSocketResponseMessage
|
|
|
|
|
// Populate if we're sending a response:
|
|
|
|
|
ResponseMessage *SimpleResponse
|
|
|
|
|
// Populate this for request AND response
|
|
|
|
|
RequestMessage *signalpb.WebSocketRequestMessage
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func writeLoop(
|
|
|
|
|
ctx context.Context,
|
|
|
|
|
ws *websocket.Conn,
|
2023-08-22 11:54:35 -04:00
|
|
|
sendChannel chan SignalWebsocketSendMessage,
|
2025-11-24 15:20:48 +00:00
|
|
|
responseChannels *exsync.Map[uint64, chan *signalpb.WebSocketResponseMessage],
|
2023-06-07 01:36:32 -04:00
|
|
|
) error {
|
2023-12-26 19:08:02 -07:00
|
|
|
log := zerolog.Ctx(ctx).With().
|
|
|
|
|
Str("loop", "signal_websocket_write_loop").
|
|
|
|
|
Logger()
|
2023-06-29 21:36:09 -04:00
|
|
|
for i := uint64(1); ; i++ {
|
2023-06-07 01:36:32 -04:00
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
2023-08-22 11:54:35 -04:00
|
|
|
if ctx.Err() != nil && ctx.Err() != context.Canceled {
|
|
|
|
|
return ctx.Err()
|
|
|
|
|
}
|
|
|
|
|
return nil
|
2023-06-07 01:36:32 -04:00
|
|
|
case request, ok := <-sendChannel:
|
|
|
|
|
if !ok {
|
2024-12-17 12:39:43 +02:00
|
|
|
return errors.New("send channel closed")
|
2023-06-07 01:36:32 -04:00
|
|
|
}
|
|
|
|
|
if request.RequestMessage != nil && request.ResponseChannel != nil {
|
|
|
|
|
msgType := signalpb.WebSocketMessage_REQUEST
|
|
|
|
|
message := &signalpb.WebSocketMessage{
|
|
|
|
|
Type: &msgType,
|
|
|
|
|
Request: request.RequestMessage,
|
|
|
|
|
}
|
|
|
|
|
request.RequestMessage.Id = &i
|
2025-11-24 15:20:48 +00:00
|
|
|
responseChannels.Set(i, request.ResponseChannel)
|
2024-12-17 12:39:43 +02:00
|
|
|
if !request.RequestTime.IsZero() {
|
2023-09-06 00:47:08 -04:00
|
|
|
elapsed := time.Since(request.RequestTime)
|
|
|
|
|
if elapsed > 1*time.Minute {
|
2024-12-17 12:39:43 +02:00
|
|
|
return fmt.Errorf("request too old (%v), not sending", elapsed)
|
2023-09-06 00:47:08 -04:00
|
|
|
} else if elapsed > 10*time.Second {
|
2023-12-26 19:08:02 -07:00
|
|
|
log.Warn().
|
|
|
|
|
Uint64("request_id", i).
|
|
|
|
|
Str("request_verb", *request.RequestMessage.Verb).
|
2025-11-27 16:54:12 +02:00
|
|
|
Str("request_path", *request.RequestMessage.Path).
|
2023-12-26 19:08:02 -07:00
|
|
|
Dur("elapsed", elapsed).
|
|
|
|
|
Msg("Sending WS request")
|
2023-09-06 00:47:08 -04:00
|
|
|
} else {
|
2023-12-26 19:08:02 -07:00
|
|
|
log.Debug().
|
|
|
|
|
Uint64("request_id", i).
|
|
|
|
|
Str("request_verb", *request.RequestMessage.Verb).
|
2025-11-27 16:54:12 +02:00
|
|
|
Str("request_path", *request.RequestMessage.Path).
|
2023-12-26 19:08:02 -07:00
|
|
|
Dur("elapsed", elapsed).
|
|
|
|
|
Msg("Sending WS request")
|
2023-09-06 00:47:08 -04:00
|
|
|
}
|
|
|
|
|
}
|
2023-06-07 01:36:32 -04:00
|
|
|
err := wspb.Write(ctx, ws, message)
|
|
|
|
|
if err != nil {
|
2023-09-06 00:47:08 -04:00
|
|
|
if ctx.Err() != nil && ctx.Err() != context.Canceled {
|
|
|
|
|
return ctx.Err()
|
|
|
|
|
}
|
2023-09-30 14:39:26 +03:00
|
|
|
return fmt.Errorf("error writing request message: %w", err)
|
2023-06-07 01:36:32 -04:00
|
|
|
}
|
|
|
|
|
} else if request.RequestMessage != nil && request.ResponseMessage != nil {
|
2024-01-06 10:44:36 -07:00
|
|
|
message := CreateWSResponse(ctx, *request.RequestMessage.Id, request.ResponseMessage.Status)
|
2023-12-26 19:08:02 -07:00
|
|
|
log.Debug().
|
|
|
|
|
Uint64("request_id", *request.RequestMessage.Id).
|
|
|
|
|
Int("response_status", request.ResponseMessage.Status).
|
|
|
|
|
Msg("Sending WS response")
|
2025-05-12 18:37:19 +03:00
|
|
|
writeStartTime := time.Now()
|
2023-06-07 01:36:32 -04:00
|
|
|
err := wspb.Write(ctx, ws, message)
|
|
|
|
|
if err != nil {
|
2023-09-30 14:39:26 +03:00
|
|
|
return fmt.Errorf("error writing response message: %w", err)
|
2023-06-07 01:36:32 -04:00
|
|
|
}
|
2025-05-12 18:37:19 +03:00
|
|
|
if request.ResponseMessage.WriteCallback != nil {
|
|
|
|
|
request.ResponseMessage.WriteCallback(writeStartTime)
|
|
|
|
|
}
|
2023-06-07 01:36:32 -04:00
|
|
|
} else {
|
2024-01-06 10:44:36 -07:00
|
|
|
return fmt.Errorf("invalid request: %+v", request)
|
2023-06-07 01:36:32 -04:00
|
|
|
}
|
2023-05-31 16:39:09 -04:00
|
|
|
}
|
|
|
|
|
}
|
2023-06-07 01:36:32 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *SignalWebsocket) SendRequest(
|
|
|
|
|
ctx context.Context,
|
2025-11-24 14:34:29 +02:00
|
|
|
method,
|
|
|
|
|
path string,
|
|
|
|
|
body []byte,
|
|
|
|
|
headers http.Header,
|
2023-09-06 00:47:08 -04:00
|
|
|
) (*signalpb.WebSocketResponseMessage, error) {
|
2025-07-07 15:35:45 +03:00
|
|
|
if s == nil {
|
|
|
|
|
return nil, errors.New("websocket is nil")
|
|
|
|
|
}
|
2025-11-24 14:34:29 +02:00
|
|
|
headerArray := make([]string, len(headers))
|
2025-11-24 15:03:02 +02:00
|
|
|
var hasContentType bool
|
2025-11-24 14:34:29 +02:00
|
|
|
for key, values := range headers {
|
2025-11-24 15:03:02 +02:00
|
|
|
if strings.ToLower(key) == "content-type" {
|
|
|
|
|
hasContentType = true
|
|
|
|
|
}
|
2025-11-24 14:34:29 +02:00
|
|
|
for _, value := range values {
|
|
|
|
|
headerArray = append(headerArray, fmt.Sprintf("%s:%s", strings.ToLower(key), value))
|
|
|
|
|
}
|
|
|
|
|
}
|
2025-11-24 15:03:02 +02:00
|
|
|
if !hasContentType && body != nil {
|
|
|
|
|
headerArray = append(headerArray, "content-type:application/json")
|
|
|
|
|
}
|
2025-11-24 14:34:29 +02:00
|
|
|
return s.sendRequestInternal(ctx, &signalpb.WebSocketRequestMessage{
|
|
|
|
|
Verb: &method,
|
|
|
|
|
Path: &path,
|
|
|
|
|
Body: body,
|
|
|
|
|
Headers: headerArray,
|
|
|
|
|
}, time.Now(), 0)
|
2023-09-06 00:47:08 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *SignalWebsocket) sendRequestInternal(
|
|
|
|
|
ctx context.Context,
|
|
|
|
|
request *signalpb.WebSocketRequestMessage,
|
|
|
|
|
startTime time.Time,
|
|
|
|
|
retryCount int,
|
|
|
|
|
) (*signalpb.WebSocketResponseMessage, error) {
|
2023-07-14 15:00:25 -04:00
|
|
|
if s.basicAuth != nil {
|
2025-01-15 23:40:50 +02:00
|
|
|
request.Headers = append(request.Headers, "authorization:Basic "+s.basicAuth.String())
|
2023-07-14 15:00:25 -04:00
|
|
|
}
|
2023-06-07 01:36:32 -04:00
|
|
|
responseChannel := make(chan *signalpb.WebSocketResponseMessage, 1)
|
2025-03-25 12:52:13 +02:00
|
|
|
err := s.pushOutgoing(ctx, SignalWebsocketSendMessage{
|
2023-06-07 01:36:32 -04:00
|
|
|
RequestMessage: request,
|
|
|
|
|
ResponseChannel: responseChannel,
|
2023-09-06 00:47:08 -04:00
|
|
|
RequestTime: startTime,
|
2025-03-25 12:52:13 +02:00
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
2023-09-06 00:47:08 -04:00
|
|
|
}
|
|
|
|
|
response := <-responseChannel
|
|
|
|
|
|
2025-11-24 18:18:50 +02:00
|
|
|
isSelfDelete := request.GetVerb() == http.MethodDelete && strings.HasPrefix(request.GetPath(), "/v1/devices/")
|
|
|
|
|
if response == nil && !isSelfDelete {
|
2023-09-06 00:47:08 -04:00
|
|
|
// If out of retries, return error no matter what
|
|
|
|
|
if retryCount >= 3 {
|
|
|
|
|
// TODO: I think error isn't getting passed in this context (as it's not the one in writeLoop)
|
|
|
|
|
if ctx.Err() != nil {
|
2023-09-30 14:39:26 +03:00
|
|
|
return nil, fmt.Errorf("retried 3 times, giving up: %w", ctx.Err())
|
2023-09-06 00:47:08 -04:00
|
|
|
} else {
|
2025-03-25 12:52:13 +02:00
|
|
|
return nil, errors.New("retried 3 times, giving up")
|
2023-09-06 00:47:08 -04:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if ctx.Err() != nil {
|
|
|
|
|
// if error contains "Took too long" don't retry
|
|
|
|
|
if strings.Contains(ctx.Err().Error(), "Took too long") {
|
|
|
|
|
return nil, ctx.Err()
|
|
|
|
|
}
|
|
|
|
|
}
|
2023-12-26 19:08:02 -07:00
|
|
|
zerolog.Ctx(ctx).Warn().Int("retry_count", retryCount).Msg("Received nil response, retrying recursively")
|
2023-09-06 00:47:08 -04:00
|
|
|
return s.sendRequestInternal(ctx, request, startTime, retryCount+1)
|
2023-06-07 01:36:32 -04:00
|
|
|
}
|
2023-09-06 00:47:08 -04:00
|
|
|
return response, nil
|
2023-05-31 16:39:09 -04:00
|
|
|
}
|
|
|
|
|
|
2025-01-15 23:40:50 +02:00
|
|
|
func OpenWebsocket(ctx context.Context, url string) (*websocket.Conn, *http.Response, error) {
|
2023-05-31 16:39:09 -04:00
|
|
|
opt := &websocket.DialOptions{
|
2024-01-13 16:20:49 +02:00
|
|
|
HTTPClient: SignalHTTPClient,
|
2024-01-14 13:31:17 +02:00
|
|
|
HTTPHeader: make(http.Header, 2),
|
2023-05-31 16:39:09 -04:00
|
|
|
}
|
2024-01-14 13:31:17 +02:00
|
|
|
opt.HTTPHeader.Set("User-Agent", UserAgent)
|
|
|
|
|
opt.HTTPHeader.Set("X-Signal-Agent", SignalAgent)
|
|
|
|
|
ws, resp, err := websocket.Dial(ctx, url, opt)
|
2023-11-22 23:06:41 -05:00
|
|
|
if ws != nil {
|
|
|
|
|
ws.SetReadLimit(1 << 20) // Increase read limit to 1MB from default of 32KB
|
|
|
|
|
}
|
2023-05-31 16:39:09 -04:00
|
|
|
return ws, resp, err
|
|
|
|
|
}
|
|
|
|
|
|
2024-01-06 10:44:36 -07:00
|
|
|
func CreateWSResponse(ctx context.Context, id uint64, status int) *signalpb.WebSocketMessage {
|
2023-05-31 16:39:09 -04:00
|
|
|
if status != 200 && status != 400 {
|
2023-07-27 18:13:33 -04:00
|
|
|
// TODO support more responses to Signal? Are there more?
|
2024-01-06 10:44:36 -07:00
|
|
|
zerolog.Ctx(ctx).Fatal().Int("status", status).Msg("Error creating response. Non 200/400 not supported yet.")
|
2023-05-31 16:39:09 -04:00
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
msg_type := signalpb.WebSocketMessage_RESPONSE
|
|
|
|
|
message := "OK"
|
|
|
|
|
if status == 400 {
|
|
|
|
|
message = "Unknown"
|
|
|
|
|
}
|
|
|
|
|
status32 := uint32(status)
|
|
|
|
|
response := &signalpb.WebSocketMessage{
|
|
|
|
|
Type: &msg_type,
|
|
|
|
|
Response: &signalpb.WebSocketResponseMessage{
|
|
|
|
|
Id: &id,
|
|
|
|
|
Message: &message,
|
|
|
|
|
Status: &status32,
|
|
|
|
|
Headers: []string{},
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
return response
|
|
|
|
|
}
|
|
|
|
|
|
2023-06-25 17:21:19 -04:00
|
|
|
func CreateWSRequest(method string, path string, body []byte, username *string, password *string) *signalpb.WebSocketRequestMessage {
|
2023-05-31 16:39:09 -04:00
|
|
|
request := &signalpb.WebSocketRequestMessage{
|
|
|
|
|
Verb: &method,
|
|
|
|
|
Path: &path,
|
|
|
|
|
Body: body,
|
|
|
|
|
}
|
2023-06-26 10:43:33 -04:00
|
|
|
request.Headers = []string{}
|
2023-06-29 21:36:09 -04:00
|
|
|
request.Headers = append(request.Headers, "content-type:application/json; charset=utf-8")
|
2023-05-31 16:39:09 -04:00
|
|
|
if username != nil && password != nil {
|
|
|
|
|
basicAuth := base64.StdEncoding.EncodeToString([]byte(*username + ":" + *password))
|
|
|
|
|
request.Headers = append(request.Headers, "authorization:Basic "+basicAuth)
|
|
|
|
|
}
|
|
|
|
|
return request
|
|
|
|
|
}
|