{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ViewPatterns #-}
{-# OPTIONS_GHC -fno-warn-unused-imports #-}
module Cardano.Logging.Prometheus.NetworkRun
( NetworkRunParams (..)
, TimeoutServer
, defaultRunParams
, mkTCPServerRunner
) where
import Cardano.Logging.Utils (threadLabelMe)
import Control.Concurrent (forkFinally, forkIO, threadDelay)
import Control.Concurrent.MVar
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TBQueue
import qualified Control.Exception as E
import Control.Monad (forever, void, when)
import qualified Data.Foldable as F (sum)
import Data.Hashable (hash)
import qualified Data.IntMap.Strict as IM
import qualified Data.List.NonEmpty as NE
import Data.Maybe (fromMaybe)
import Network.Socket
import qualified System.TimeManager as T
data NetworkRunParams = NetworkRunParams
{ NetworkRunParams -> Int
runSocketTimeout :: !Int
, NetworkRunParams -> Int
runSocketGraceful :: !Int
, NetworkRunParams -> Int
runRecvMaxSize :: !Int
, NetworkRunParams -> Double
runRateLimit :: !Double
, NetworkRunParams -> Int
runConnLimitGlobal :: !Int
, NetworkRunParams -> Int
runConnLimitPerHost :: !Int
, NetworkRunParams -> String
runServerName :: !String
}
defaultRunParams :: String -> NetworkRunParams
defaultRunParams :: String -> NetworkRunParams
defaultRunParams String
name = NetworkRunParams
{ runSocketTimeout :: Int
runSocketTimeout = Int
22
, runSocketGraceful :: Int
runSocketGraceful = Int
1000
, runRecvMaxSize :: Int
runRecvMaxSize = Int
2048
, runRateLimit :: Double
runRateLimit = Double
3.0
, runConnLimitGlobal :: Int
runConnLimitGlobal = Int
12
, runConnLimitPerHost :: Int
runConnLimitPerHost = Int
4
, runServerName :: String
runServerName = String
name
}
type TimeoutServer a
= NetworkRunParams
-> IO ()
-> Socket
-> IO a
mkTCPServerRunner
:: NetworkRunParams
-> Maybe HostName
-> PortNumber
-> TimeoutServer ()
-> IO (IO ())
mkTCPServerRunner :: NetworkRunParams
-> Maybe String -> PortNumber -> TimeoutServer () -> IO (IO ())
mkTCPServerRunner NetworkRunParams
runParams (String -> Maybe String -> String
forall a. a -> Maybe a -> a
fromMaybe String
"127.0.0.1" -> String
host) PortNumber
portNo TimeoutServer ()
server = do
!Socket
sock <- AddrInfo -> IO Socket
openTCPServerSocket (AddrInfo -> IO Socket) -> IO AddrInfo -> IO Socket
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< String -> PortNumber -> IO AddrInfo
resolve String
host PortNumber
portNo
let
runner :: IO ()
runner = do
String -> IO ()
threadLabelMe (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ NetworkRunParams -> String
runServerName NetworkRunParams
runParams String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
" server"
NetworkRunParams -> Socket -> TimeoutServer () -> IO ()
runTCPServerWithSocket NetworkRunParams
runParams Socket
sock TimeoutServer ()
server IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
`E.finally` Socket -> IO ()
close Socket
sock
IO () -> IO (IO ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure IO ()
runner
runTCPServerWithSocket
:: NetworkRunParams
-> Socket
-> TimeoutServer ()
-> IO ()
runTCPServerWithSocket :: NetworkRunParams -> Socket -> TimeoutServer () -> IO ()
runTCPServerWithSocket runParams :: NetworkRunParams
runParams@NetworkRunParams{Double
Int
String
runSocketTimeout :: NetworkRunParams -> Int
runSocketGraceful :: NetworkRunParams -> Int
runRecvMaxSize :: NetworkRunParams -> Int
runRateLimit :: NetworkRunParams -> Double
runConnLimitGlobal :: NetworkRunParams -> Int
runConnLimitPerHost :: NetworkRunParams -> Int
runServerName :: NetworkRunParams -> String
runSocketTimeout :: Int
runSocketGraceful :: Int
runRecvMaxSize :: Int
runRateLimit :: Double
runConnLimitGlobal :: Int
runConnLimitPerHost :: Int
runServerName :: String
..} Socket
sock TimeoutServer ()
server = do
RateLimiter
rateLimiter <- String -> Double -> IO RateLimiter
mkRateLimiter String
runServerName Double
runRateLimit
ConnLimiter{SockAddr -> IO Bool
SockAddr -> IO ()
canServeThisPeer :: SockAddr -> IO Bool
releasePeer :: SockAddr -> IO ()
canServeThisPeer :: ConnLimiter -> SockAddr -> IO Bool
releasePeer :: ConnLimiter -> SockAddr -> IO ()
..} <- Int -> Int -> IO ConnLimiter
mkConnLimiter Int
runConnLimitGlobal Int
runConnLimitPerHost
Int -> (Manager -> IO ()) -> IO ()
forall a. Int -> (Manager -> IO a) -> IO a
T.withManager (Int
runSocketTimeout Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000000) ((Manager -> IO ()) -> IO ()) -> (Manager -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Manager
mgr -> IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
RateLimiter -> IO ()
waitForLimiter RateLimiter
rateLimiter
IO (Socket, SockAddr)
-> ((Socket, SockAddr) -> IO ())
-> ((Socket, SockAddr) -> IO ())
-> IO ()
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
E.bracketOnError (Socket -> IO (Socket, SockAddr)
accept Socket
sock) (Socket -> IO ()
close (Socket -> IO ())
-> ((Socket, SockAddr) -> Socket) -> (Socket, SockAddr) -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Socket, SockAddr) -> Socket
forall a b. (a, b) -> a
fst) (((Socket, SockAddr) -> IO ()) -> IO ())
-> ((Socket, SockAddr) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(Socket
conn, SockAddr
peer) -> do
Bool
noLimitHit <- SockAddr -> IO Bool
canServeThisPeer SockAddr
peer
if Bool
noLimitHit
then IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> (Either SomeException () -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally (Manager -> Socket -> IO ()
runServer Manager
mgr Socket
conn) (IO () -> Either SomeException () -> IO ()
forall a b. a -> b -> a
const (IO () -> Either SomeException () -> IO ())
-> IO () -> Either SomeException () -> IO ()
forall a b. (a -> b) -> a -> b
$ Socket -> IO ()
gclose Socket
conn IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> SockAddr -> IO ()
releasePeer SockAddr
peer)
else Socket -> IO ()
close Socket
conn
where
gclose :: Socket -> IO ()
gclose = if Int
runSocketGraceful Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0 then (Socket -> Int -> IO ()) -> Int -> Socket -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip Socket -> Int -> IO ()
gracefulClose Int
runSocketGraceful else Socket -> IO ()
close
runServer :: Manager -> Socket -> IO ()
runServer Manager
mgr Socket
conn = do
String -> IO ()
threadLabelMe (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
runServerName String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
" timeout server"
Manager -> IO () -> (Handle -> IO ()) -> IO ()
T.withHandleKillThread Manager
mgr (() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()) ((Handle -> IO ()) -> IO ()) -> (Handle -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Handle
timeoutHandle ->
TimeoutServer ()
server NetworkRunParams
runParams (Handle -> IO ()
T.tickle Handle
timeoutHandle) Socket
conn
resolve :: HostName -> PortNumber -> IO AddrInfo
resolve :: String -> PortNumber -> IO AddrInfo
resolve String
host PortNumber
portNo =
[AddrInfo] -> AddrInfo
forall a. HasCallStack => [a] -> a
head ([AddrInfo] -> AddrInfo) -> IO [AddrInfo] -> IO AddrInfo
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe AddrInfo -> Maybe String -> Maybe String -> IO [AddrInfo]
forall (t :: * -> *).
GetAddrInfo t =>
Maybe AddrInfo -> Maybe String -> Maybe String -> IO (t AddrInfo)
getAddrInfo (AddrInfo -> Maybe AddrInfo
forall a. a -> Maybe a
Just AddrInfo
hints) (String -> Maybe String
forall a. a -> Maybe a
Just String
host) (String -> Maybe String
forall a. a -> Maybe a
Just (String -> Maybe String) -> String -> Maybe String
forall a b. (a -> b) -> a -> b
$ PortNumber -> String
forall a. Show a => a -> String
show PortNumber
portNo)
where
hints :: AddrInfo
hints = AddrInfo
defaultHints { addrSocketType = Stream, addrFlags = [AI_PASSIVE] }
openTCPServerSocket :: AddrInfo -> IO Socket
openTCPServerSocket :: AddrInfo -> IO Socket
openTCPServerSocket AddrInfo
addr = do
Socket
sock <- IO Socket
openServerSocket
Socket -> Int -> IO ()
listen Socket
sock Int
1024
Socket -> IO Socket
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Socket
sock
where
openServerSocket :: IO Socket
openServerSocket = IO Socket
-> (Socket -> IO ()) -> (Socket -> IO Socket) -> IO Socket
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
E.bracketOnError (AddrInfo -> IO Socket
openSocket AddrInfo
addr) Socket -> IO ()
close ((Socket -> IO Socket) -> IO Socket)
-> (Socket -> IO Socket) -> IO Socket
forall a b. (a -> b) -> a -> b
$ \Socket
sock -> do
Socket -> SocketOption -> Int -> IO ()
setSocketOption Socket
sock SocketOption
ReuseAddr Int
1
#if !defined(openbsd_HOST_OS)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (AddrInfo -> Family
addrFamily AddrInfo
addr Family -> Family -> Bool
forall a. Eq a => a -> a -> Bool
== Family
AF_INET6) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Socket -> SocketOption -> Int -> IO ()
setSocketOption Socket
sock SocketOption
IPv6Only Int
1
#endif
Socket -> (CInt -> IO ()) -> IO ()
forall r. Socket -> (CInt -> IO r) -> IO r
withFdSocket Socket
sock CInt -> IO ()
setCloseOnExecIfNeeded
Socket -> SockAddr -> IO ()
bind Socket
sock (SockAddr -> IO ()) -> SockAddr -> IO ()
forall a b. (a -> b) -> a -> b
$ AddrInfo -> SockAddr
addrAddress AddrInfo
addr
Socket -> IO Socket
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Socket
sock
newtype RateLimiter = RateLimiter {RateLimiter -> IO ()
waitForLimiter :: IO ()}
mkRateLimiter :: String -> Double -> IO RateLimiter
mkRateLimiter :: String -> Double -> IO RateLimiter
mkRateLimiter String
_ Double
0.0 = RateLimiter -> IO RateLimiter
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (RateLimiter -> IO RateLimiter) -> RateLimiter -> IO RateLimiter
forall a b. (a -> b) -> a -> b
$ IO () -> RateLimiter
RateLimiter (() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
mkRateLimiter String
serverName Double
reqPerSecond = do
TBQueue ()
lock <- Natural -> IO (TBQueue ())
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
queueSize
IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> (IO () -> IO ThreadId) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ThreadId
forkIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
String -> IO ()
threadLabelMe (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
serverName String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
" rate limiter"
IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue () -> () -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue ()
lock ()
Int -> IO ()
threadDelay Int
delay
RateLimiter -> IO RateLimiter
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (RateLimiter -> IO RateLimiter) -> RateLimiter -> IO RateLimiter
forall a b. (a -> b) -> a -> b
$ IO () -> RateLimiter
RateLimiter (IO () -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue () -> STM ()
forall a. TBQueue a -> STM a
readTBQueue TBQueue ()
lock)
where
delay :: Int
delay = Double -> Int
forall b. Integral b => Double -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (Double -> Int) -> Double -> Int
forall a b. (a -> b) -> a -> b
$ Double
1000000 Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Double
reqPerSecond
queueSize :: Natural
queueSize = Double -> Natural
forall b. Integral b => Double -> b
forall a b. (RealFrac a, Integral b) => a -> b
ceiling Double
reqPerSecond
data ConnLimiter = ConnLimiter
{ ConnLimiter -> SockAddr -> IO Bool
canServeThisPeer :: SockAddr -> IO Bool
, ConnLimiter -> SockAddr -> IO ()
releasePeer :: SockAddr -> IO ()
}
mkConnLimiter :: Int -> Int -> IO ConnLimiter
mkConnLimiter :: Int -> Int -> IO ConnLimiter
mkConnLimiter Int
0 Int
0 = ConnLimiter -> IO ConnLimiter
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ConnLimiter -> IO ConnLimiter) -> ConnLimiter -> IO ConnLimiter
forall a b. (a -> b) -> a -> b
$ (SockAddr -> IO Bool) -> (SockAddr -> IO ()) -> ConnLimiter
ConnLimiter (IO Bool -> SockAddr -> IO Bool
forall a b. a -> b -> a
const (IO Bool -> SockAddr -> IO Bool) -> IO Bool -> SockAddr -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool -> IO Bool
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True) (IO () -> SockAddr -> IO ()
forall a b. a -> b -> a
const (IO () -> SockAddr -> IO ()) -> IO () -> SockAddr -> IO ()
forall a b. (a -> b) -> a -> b
$ () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
mkConnLimiter Int
global Int
perHost = do
MVar (IntMap Int)
lock <- IntMap Int -> IO (MVar (IntMap Int))
forall a. a -> IO (MVar a)
newMVar IntMap Int
forall a. IntMap a
IM.empty
let
canServeThisPeer :: SockAddr -> IO Bool
canServeThisPeer (SockAddr -> Int
getPeerId -> Int
peerId) =
MVar (IntMap Int)
-> (IntMap Int -> IO (IntMap Int, Bool)) -> IO Bool
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar (IntMap Int)
lock ((IntMap Int -> IO (IntMap Int, Bool)) -> IO Bool)
-> (IntMap Int -> IO (IntMap Int, Bool)) -> IO Bool
forall a b. (a -> b) -> a -> b
$ \IntMap Int
intMap ->
let
intMap' :: IntMap Int
intMap' = (Maybe Int -> Maybe Int) -> Int -> IntMap Int -> IntMap Int
forall a. (Maybe a -> Maybe a) -> Int -> IntMap a -> IntMap a
IM.alter Maybe Int -> Maybe Int
upsert Int
peerId IntMap Int
intMap
count' :: Int
count' = IntMap Int -> Int
forall a. Num a => IntMap a -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
F.sum IntMap Int
intMap'
canServe :: Bool
canServe = Int -> Bool
didntHitGlobalLimit Int
count' Bool -> Bool -> Bool
&& Int
count' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> IntMap Int -> Int
forall a. Num a => IntMap a -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
F.sum IntMap Int
intMap
in (IntMap Int, Bool) -> IO (IntMap Int, Bool)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (if Bool
canServe then IntMap Int
intMap' else IntMap Int
intMap, Bool
canServe)
releasePeer :: SockAddr -> IO ()
releasePeer (SockAddr -> Int
getPeerId -> Int
peerId) =
MVar (IntMap Int) -> (IntMap Int -> IO (IntMap Int)) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar (IntMap Int)
lock (IntMap Int -> IO (IntMap Int)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (IntMap Int -> IO (IntMap Int))
-> (IntMap Int -> IntMap Int) -> IntMap Int -> IO (IntMap Int)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Maybe Int -> Maybe Int) -> Int -> IntMap Int -> IntMap Int
forall a. (Maybe a -> Maybe a) -> Int -> IntMap a -> IntMap a
IM.alter Maybe Int -> Maybe Int
removeOrDecrease Int
peerId)
ConnLimiter -> IO ConnLimiter
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ConnLimiter{SockAddr -> IO Bool
SockAddr -> IO ()
canServeThisPeer :: SockAddr -> IO Bool
releasePeer :: SockAddr -> IO ()
canServeThisPeer :: SockAddr -> IO Bool
releasePeer :: SockAddr -> IO ()
..}
where
wontHitHostLimit :: Int -> Bool
wontHitHostLimit = if Int
perHost Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 then Bool -> Int -> Bool
forall a b. a -> b -> a
const Bool
True else (Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
perHost)
didntHitGlobalLimit :: Int -> Bool
didntHitGlobalLimit = if Int
global Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 then Bool -> Int -> Bool
forall a b. a -> b -> a
const Bool
True else (Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
global)
upsert, removeOrDecrease :: Maybe Int -> Maybe Int
upsert :: Maybe Int -> Maybe Int
upsert = \case
Just Int
n -> if Int -> Bool
wontHitHostLimit Int
n then Int -> Maybe Int
forall a. a -> Maybe a
Just (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) else Int -> Maybe Int
forall a. a -> Maybe a
Just Int
n
Maybe Int
Nothing -> Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1
removeOrDecrease :: Maybe Int -> Maybe Int
removeOrDecrease = \case
Just Int
n | Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
1 -> Int -> Maybe Int
forall a. a -> Maybe a
Just (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)
Maybe Int
_ -> Maybe Int
forall a. Maybe a
Nothing
getPeerId :: SockAddr -> Int
getPeerId :: SockAddr -> Int
getPeerId = \case
SockAddrInet PortNumber
_ HostAddress
h -> HostAddress -> Int
forall a. Hashable a => a -> Int
hash HostAddress
h
SockAddrInet6 PortNumber
_ HostAddress
_ HostAddress6
h HostAddress
_ -> HostAddress6 -> Int
forall a. Hashable a => a -> Int
hash HostAddress6
h
SockAddrUnix String
s -> String -> Int
forall a. Hashable a => a -> Int
hash String
s