mirror of
https://git.deuxfleurs.fr/Deuxfleurs/garage.git
synced 2026-05-15 05:36:53 -04:00
Compare commits
14 commits
main-v1
...
feat-delay
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
85aca61860 |
||
|
|
46ebfdba66 |
||
|
|
ee8fa687ad |
||
|
|
fa457328c8 |
||
|
|
f34558af07 |
||
|
|
d78e5f8a1b |
||
|
|
3172f875ae |
||
|
|
11a6417d11 |
||
|
|
b0a9e007bd |
||
|
|
904548d1d1 |
||
|
|
6cc79bc696 |
||
|
|
60b3d28f93 |
||
|
|
7fddf0af9c |
||
|
|
78882f4040 |
22 changed files with 403 additions and 65 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
|
@ -1470,6 +1470,7 @@ dependencies = [
|
|||
"tokio",
|
||||
"tokio-stream",
|
||||
"tokio-util 0.7.14",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ use opentelemetry::{
|
|||
Context,
|
||||
};
|
||||
|
||||
use garage_net::endpoint::RpcInFlightLimiter;
|
||||
use garage_net::stream::{read_stream_to_end, stream_asyncread, ByteStream};
|
||||
|
||||
use garage_db as db;
|
||||
|
|
@ -295,6 +296,7 @@ impl BlockManager {
|
|||
&node_id,
|
||||
BlockRpc::GetBlock(*hash, order_tag),
|
||||
priority,
|
||||
RpcInFlightLimiter::TableWrite,
|
||||
);
|
||||
tokio::select! {
|
||||
res = rpc => {
|
||||
|
|
|
|||
|
|
@ -13,6 +13,8 @@ use serde::{Deserialize, Serialize};
|
|||
|
||||
use format_table::format_table_to_string;
|
||||
|
||||
use garage_net::endpoint::RpcInFlightLimiter;
|
||||
|
||||
use garage_util::background::BackgroundRunner;
|
||||
use garage_util::data::*;
|
||||
use garage_util::error::Error as GarageError;
|
||||
|
|
@ -118,6 +120,7 @@ impl AdminRpcHandler {
|
|||
&node,
|
||||
AdminRpc::LaunchRepair(opt_to_send.clone()),
|
||||
PRIO_NORMAL,
|
||||
RpcInFlightLimiter::NoLimit,
|
||||
)
|
||||
.await;
|
||||
if !matches!(resp, Ok(Ok(_))) {
|
||||
|
|
@ -164,7 +167,12 @@ impl AdminRpcHandler {
|
|||
let node_id = (*node).into();
|
||||
match self
|
||||
.endpoint
|
||||
.call(&node_id, AdminRpc::Stats(opt), PRIO_NORMAL)
|
||||
.call(
|
||||
&node_id,
|
||||
AdminRpc::Stats(opt),
|
||||
PRIO_NORMAL,
|
||||
RpcInFlightLimiter::NoLimit,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Ok(AdminRpc::Ok(s))) => writeln!(&mut ret, "{}", s).unwrap(),
|
||||
|
|
@ -407,6 +415,7 @@ impl AdminRpcHandler {
|
|||
variable: variable.clone(),
|
||||
}),
|
||||
PRIO_NORMAL,
|
||||
RpcInFlightLimiter::NoLimit,
|
||||
)
|
||||
.await??
|
||||
{
|
||||
|
|
@ -456,6 +465,7 @@ impl AdminRpcHandler {
|
|||
value: value.to_string(),
|
||||
}),
|
||||
PRIO_NORMAL,
|
||||
RpcInFlightLimiter::NoLimit,
|
||||
)
|
||||
.await??
|
||||
{
|
||||
|
|
@ -488,6 +498,7 @@ impl AdminRpcHandler {
|
|||
&to,
|
||||
AdminRpc::MetaOperation(MetaOperation::Snapshot { all: false }),
|
||||
PRIO_NORMAL,
|
||||
RpcInFlightLimiter::NoLimit,
|
||||
)
|
||||
.await?
|
||||
}))
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ use std::collections::{HashMap, HashSet};
|
|||
use std::time::Duration;
|
||||
|
||||
use format_table::format_table;
|
||||
use garage_net::endpoint::RpcInFlightLimiter;
|
||||
use garage_util::error::*;
|
||||
|
||||
use garage_rpc::layout::*;
|
||||
|
|
@ -200,7 +201,12 @@ pub async fn cmd_connect(
|
|||
args: ConnectNodeOpt,
|
||||
) -> Result<(), Error> {
|
||||
match rpc_cli
|
||||
.call(&rpc_host, SystemRpc::Connect(args.node), PRIO_NORMAL)
|
||||
.call(
|
||||
&rpc_host,
|
||||
SystemRpc::Connect(args.node),
|
||||
PRIO_NORMAL,
|
||||
RpcInFlightLimiter::NoLimit,
|
||||
)
|
||||
.await??
|
||||
{
|
||||
SystemRpc::Ok => {
|
||||
|
|
@ -216,7 +222,10 @@ pub async fn cmd_admin(
|
|||
rpc_host: NodeID,
|
||||
args: AdminRpc,
|
||||
) -> Result<(), HelperError> {
|
||||
match rpc_cli.call(&rpc_host, args, PRIO_NORMAL).await?? {
|
||||
match rpc_cli
|
||||
.call(&rpc_host, args, PRIO_NORMAL, RpcInFlightLimiter::NoLimit)
|
||||
.await??
|
||||
{
|
||||
AdminRpc::Ok(msg) => {
|
||||
println!("{}", msg);
|
||||
}
|
||||
|
|
@ -271,7 +280,12 @@ pub async fn fetch_status(
|
|||
rpc_host: NodeID,
|
||||
) -> Result<Vec<KnownNodeInfo>, Error> {
|
||||
match rpc_cli
|
||||
.call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL)
|
||||
.call(
|
||||
&rpc_host,
|
||||
SystemRpc::GetKnownNodes,
|
||||
PRIO_NORMAL,
|
||||
RpcInFlightLimiter::NoLimit,
|
||||
)
|
||||
.await??
|
||||
{
|
||||
SystemRpc::ReturnKnownNodes(nodes) => Ok(nodes),
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
use bytesize::ByteSize;
|
||||
|
||||
use format_table::format_table;
|
||||
use garage_net::endpoint::RpcInFlightLimiter;
|
||||
use garage_util::crdt::Crdt;
|
||||
use garage_util::error::*;
|
||||
|
||||
|
|
@ -45,7 +46,12 @@ pub async fn cmd_assign_role(
|
|||
args: AssignRoleOpt,
|
||||
) -> Result<(), Error> {
|
||||
let status = match rpc_cli
|
||||
.call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL)
|
||||
.call(
|
||||
&rpc_host,
|
||||
SystemRpc::GetKnownNodes,
|
||||
PRIO_NORMAL,
|
||||
RpcInFlightLimiter::NoLimit,
|
||||
)
|
||||
.await??
|
||||
{
|
||||
SystemRpc::ReturnKnownNodes(nodes) => nodes,
|
||||
|
|
@ -475,7 +481,12 @@ pub async fn fetch_layout(
|
|||
rpc_host: NodeID,
|
||||
) -> Result<LayoutHistory, Error> {
|
||||
match rpc_cli
|
||||
.call(&rpc_host, SystemRpc::PullClusterLayout, PRIO_NORMAL)
|
||||
.call(
|
||||
&rpc_host,
|
||||
SystemRpc::PullClusterLayout,
|
||||
PRIO_NORMAL,
|
||||
RpcInFlightLimiter::NoLimit,
|
||||
)
|
||||
.await??
|
||||
{
|
||||
SystemRpc::AdvertiseClusterLayout(t) => Ok(t),
|
||||
|
|
@ -493,6 +504,7 @@ pub async fn send_layout(
|
|||
&rpc_host,
|
||||
SystemRpc::AdvertiseClusterLayout(layout),
|
||||
PRIO_NORMAL,
|
||||
RpcInFlightLimiter::NoLimit,
|
||||
)
|
||||
.await??;
|
||||
Ok(())
|
||||
|
|
|
|||
|
|
@ -244,7 +244,7 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
|
|||
// Generate a temporary keypair for our RPC client
|
||||
let (_pk, sk) = sodiumoxide::crypto::sign::ed25519::gen_keypair();
|
||||
|
||||
let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, sk, None);
|
||||
let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, sk, None, None);
|
||||
|
||||
// Find and parse the address of the target host
|
||||
let (id, addr, is_default_addr) = if let Some(h) = opt.rpc_host {
|
||||
|
|
|
|||
|
|
@ -175,7 +175,13 @@ impl Garage {
|
|||
|
||||
// ---- admin tables ----
|
||||
info!("Initialize bucket_table...");
|
||||
let bucket_table = Table::new(BucketTable, control_rep_param.clone(), system.clone(), &db);
|
||||
let bucket_table = Table::new(
|
||||
BucketTable,
|
||||
control_rep_param.clone(),
|
||||
system.clone(),
|
||||
&db,
|
||||
&config.experimental.merkle_backpressure,
|
||||
);
|
||||
|
||||
info!("Initialize bucket_alias_table...");
|
||||
let bucket_alias_table = Table::new(
|
||||
|
|
@ -183,9 +189,16 @@ impl Garage {
|
|||
control_rep_param.clone(),
|
||||
system.clone(),
|
||||
&db,
|
||||
&config.experimental.merkle_backpressure,
|
||||
);
|
||||
info!("Initialize key_table_table...");
|
||||
let key_table = Table::new(KeyTable, control_rep_param, system.clone(), &db);
|
||||
let key_table = Table::new(
|
||||
KeyTable,
|
||||
control_rep_param,
|
||||
system.clone(),
|
||||
&db,
|
||||
&config.experimental.merkle_backpressure,
|
||||
);
|
||||
|
||||
// ---- S3 tables ----
|
||||
info!("Initialize block_ref_table...");
|
||||
|
|
@ -196,6 +209,7 @@ impl Garage {
|
|||
meta_rep_param.clone(),
|
||||
system.clone(),
|
||||
&db,
|
||||
&config.experimental.merkle_backpressure,
|
||||
);
|
||||
|
||||
info!("Initialize version_table...");
|
||||
|
|
@ -206,10 +220,12 @@ impl Garage {
|
|||
meta_rep_param.clone(),
|
||||
system.clone(),
|
||||
&db,
|
||||
&config.experimental.merkle_backpressure,
|
||||
);
|
||||
|
||||
info!("Initialize multipart upload counter table...");
|
||||
let mpu_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db);
|
||||
let mpu_counter_table =
|
||||
IndexCounter::new(system.clone(), meta_rep_param.clone(), &db, &config);
|
||||
|
||||
info!("Initialize multipart upload table...");
|
||||
let mpu_table = Table::new(
|
||||
|
|
@ -220,10 +236,12 @@ impl Garage {
|
|||
meta_rep_param.clone(),
|
||||
system.clone(),
|
||||
&db,
|
||||
&config.experimental.merkle_backpressure,
|
||||
);
|
||||
|
||||
info!("Initialize object counter table...");
|
||||
let object_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db);
|
||||
let object_counter_table =
|
||||
IndexCounter::new(system.clone(), meta_rep_param.clone(), &db, &config);
|
||||
|
||||
info!("Initialize object_table...");
|
||||
#[allow(clippy::redundant_clone)]
|
||||
|
|
@ -236,6 +254,7 @@ impl Garage {
|
|||
meta_rep_param.clone(),
|
||||
system.clone(),
|
||||
&db,
|
||||
&config.experimental.merkle_backpressure,
|
||||
);
|
||||
|
||||
info!("Load lifecycle worker state...");
|
||||
|
|
@ -245,7 +264,7 @@ impl Garage {
|
|||
|
||||
// ---- K2V ----
|
||||
#[cfg(feature = "k2v")]
|
||||
let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param);
|
||||
let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param, &config);
|
||||
|
||||
// ---- setup block refcount recalculation ----
|
||||
// this function can be used to fix inconsistencies in the RC table
|
||||
|
|
@ -335,9 +354,14 @@ impl Garage {
|
|||
|
||||
#[cfg(feature = "k2v")]
|
||||
impl GarageK2V {
|
||||
fn new(system: Arc<System>, db: &db::Db, meta_rep_param: TableShardedReplication) -> Self {
|
||||
fn new(
|
||||
system: Arc<System>,
|
||||
db: &db::Db,
|
||||
meta_rep_param: TableShardedReplication,
|
||||
config: &Config,
|
||||
) -> Self {
|
||||
info!("Initialize K2V counter table...");
|
||||
let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db);
|
||||
let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db, config);
|
||||
|
||||
info!("Initialize K2V subscription manager...");
|
||||
let subscriptions = Arc::new(SubscriptionManager::new());
|
||||
|
|
@ -351,6 +375,7 @@ impl GarageK2V {
|
|||
meta_rep_param,
|
||||
system.clone(),
|
||||
db,
|
||||
&config.experimental.merkle_backpressure,
|
||||
);
|
||||
|
||||
info!("Initialize K2V RPC handler...");
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ use garage_db as db;
|
|||
use garage_rpc::layout::LayoutHelper;
|
||||
use garage_rpc::system::System;
|
||||
use garage_util::background::BackgroundRunner;
|
||||
use garage_util::config::Config;
|
||||
use garage_util::data::*;
|
||||
use garage_util::error::*;
|
||||
use garage_util::migrate::Migrate;
|
||||
|
|
@ -173,6 +174,7 @@ impl<T: CountedItem> IndexCounter<T> {
|
|||
system: Arc<System>,
|
||||
replication: TableShardedReplication,
|
||||
db: &db::Db,
|
||||
config: &Config,
|
||||
) -> Arc<Self> {
|
||||
Arc::new(Self {
|
||||
this_node: system.id,
|
||||
|
|
@ -186,6 +188,7 @@ impl<T: CountedItem> IndexCounter<T> {
|
|||
replication,
|
||||
system,
|
||||
db,
|
||||
&config.experimental.merkle_backpressure,
|
||||
),
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -39,6 +39,7 @@ kuska-handshake.workspace = true
|
|||
|
||||
opentelemetry = { workspace = true, optional = true }
|
||||
opentelemetry-contrib = { workspace = true, optional = true }
|
||||
tracing.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
pretty_env_logger.workspace = true
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ use std::pin::Pin;
|
|||
use std::sync::atomic::{self, AtomicU32};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::task::Poll;
|
||||
use tracing::*;
|
||||
|
||||
use arc_swap::ArcSwapOption;
|
||||
use bytes::Bytes;
|
||||
|
|
@ -14,7 +15,7 @@ use futures::Stream;
|
|||
use kuska_handshake::async_std::{handshake_client, BoxStream};
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::select;
|
||||
use tokio::sync::{mpsc, oneshot, watch};
|
||||
use tokio::sync::{mpsc, oneshot, watch, Semaphore};
|
||||
use tokio_util::compat::*;
|
||||
|
||||
#[cfg(feature = "telemetry")]
|
||||
|
|
@ -25,6 +26,7 @@ use opentelemetry::{
|
|||
#[cfg(feature = "telemetry")]
|
||||
use opentelemetry_contrib::trace::propagator::binary::*;
|
||||
|
||||
use crate::endpoint::RpcInFlightLimiter;
|
||||
use crate::error::*;
|
||||
use crate::message::*;
|
||||
use crate::netapp::*;
|
||||
|
|
@ -41,6 +43,7 @@ pub(crate) struct ClientConn {
|
|||
|
||||
next_query_number: AtomicU32,
|
||||
inflight: Mutex<HashMap<RequestID, oneshot::Sender<ByteStream>>>,
|
||||
rpc_table_write_inflight_limiter: Option<Semaphore>,
|
||||
}
|
||||
|
||||
impl ClientConn {
|
||||
|
|
@ -98,8 +101,14 @@ impl ClientConn {
|
|||
next_query_number: AtomicU32::from(RequestID::default()),
|
||||
query_send: ArcSwapOption::new(Some(Arc::new(query_send))),
|
||||
inflight: Mutex::new(HashMap::new()),
|
||||
rpc_table_write_inflight_limiter: netapp.max_in_flight_table_write.map(Semaphore::new),
|
||||
});
|
||||
|
||||
info!(
|
||||
"Created conn with table write limit set to {}",
|
||||
netapp.max_in_flight_table_write.unwrap_or(0)
|
||||
);
|
||||
|
||||
netapp.connected_as_client(peer_id, conn.clone());
|
||||
|
||||
let debug_name = format!("CLI {}", hex::encode(&peer_id[..8]));
|
||||
|
|
@ -144,10 +153,21 @@ impl ClientConn {
|
|||
req: Req<T>,
|
||||
path: &str,
|
||||
prio: RequestPriority,
|
||||
limiter: RpcInFlightLimiter,
|
||||
) -> Result<Resp<T>, Error>
|
||||
where
|
||||
T: Message,
|
||||
{
|
||||
let _permit = match (limiter, &self.rpc_table_write_inflight_limiter) {
|
||||
(RpcInFlightLimiter::TableWrite, Some(sem)) => {
|
||||
info!(
|
||||
"Available RPC table write slots: {}",
|
||||
sem.available_permits()
|
||||
);
|
||||
Some(sem.acquire().await.unwrap())
|
||||
}
|
||||
_ => None,
|
||||
};
|
||||
let query_send = self.query_send.load_full().ok_or(Error::ConnectionClosed)?;
|
||||
|
||||
let id = self
|
||||
|
|
@ -212,6 +232,7 @@ impl ClientConn {
|
|||
let stream = Box::pin(canceller.for_stream(stream));
|
||||
|
||||
let resp_enc = RespEnc::decode(stream).await?;
|
||||
drop(_permit);
|
||||
debug!("client: got response to request {} (path {})", id, path);
|
||||
Resp::from_enc(resp_enc)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -57,6 +57,13 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, Default)]
|
||||
pub enum RpcInFlightLimiter {
|
||||
#[default]
|
||||
NoLimit,
|
||||
TableWrite,
|
||||
}
|
||||
|
||||
// ----
|
||||
|
||||
/// This struct represents an endpoint for message of type `M`.
|
||||
|
|
@ -114,6 +121,7 @@ where
|
|||
target: &NodeID,
|
||||
req: T,
|
||||
prio: RequestPriority,
|
||||
limiter: RpcInFlightLimiter,
|
||||
) -> Result<Resp<M>, Error>
|
||||
where
|
||||
T: IntoReq<M>,
|
||||
|
|
@ -136,7 +144,10 @@ where
|
|||
"Not connected: {}",
|
||||
hex::encode(&target[..8])
|
||||
))),
|
||||
Some(c) => c.call(req.into_req()?, self.path.as_str(), prio).await,
|
||||
Some(c) => {
|
||||
c.call(req.into_req()?, self.path.as_str(), prio, limiter)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -149,8 +160,12 @@ where
|
|||
target: &NodeID,
|
||||
req: M,
|
||||
prio: RequestPriority,
|
||||
limiter: RpcInFlightLimiter,
|
||||
) -> Result<<M as Message>::Response, Error> {
|
||||
Ok(self.call_streaming(target, req, prio).await?.into_msg())
|
||||
Ok(self
|
||||
.call_streaming(target, req, prio, limiter)
|
||||
.await?
|
||||
.into_msg())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -74,6 +74,8 @@ pub struct NetApp {
|
|||
pub id: NodeID,
|
||||
/// Private key associated with our peer ID
|
||||
pub privkey: ed25519::SecretKey,
|
||||
/// Config related to netapp
|
||||
pub(crate) max_in_flight_table_write: Option<usize>,
|
||||
|
||||
pub(crate) server_conns: RwLock<HashMap<NodeID, Arc<ServerConn>>>,
|
||||
pub(crate) client_conns: RwLock<HashMap<NodeID, Arc<ClientConn>>>,
|
||||
|
|
@ -101,6 +103,7 @@ impl NetApp {
|
|||
netid: auth::Key,
|
||||
privkey: ed25519::SecretKey,
|
||||
bind_outgoing_to: Option<IpAddr>,
|
||||
max_in_flight_table_write: Option<usize>,
|
||||
) -> Arc<Self> {
|
||||
let mut version_tag = [0u8; 16];
|
||||
version_tag[0..8].copy_from_slice(&u64::to_be_bytes(NETAPP_VERSION_TAG)[..]);
|
||||
|
|
@ -114,6 +117,7 @@ impl NetApp {
|
|||
netid,
|
||||
id,
|
||||
privkey,
|
||||
max_in_flight_table_write,
|
||||
server_conns: RwLock::new(HashMap::new()),
|
||||
client_conns: RwLock::new(HashMap::new()),
|
||||
endpoints: RwLock::new(HashMap::new()),
|
||||
|
|
@ -427,6 +431,7 @@ impl NetApp {
|
|||
server_port,
|
||||
},
|
||||
PRIO_NORMAL,
|
||||
RpcInFlightLimiter::NoLimit,
|
||||
)
|
||||
.await
|
||||
.map(|_| ())
|
||||
|
|
|
|||
|
|
@ -406,7 +406,7 @@ impl PeeringManager {
|
|||
ping_time
|
||||
);
|
||||
let ping_response = select! {
|
||||
r = self.ping_endpoint.call(&id, ping_msg, PRIO_HIGH) => r,
|
||||
r = self.ping_endpoint.call(&id, ping_msg, PRIO_HIGH, RpcInFlightLimiter::NoLimit) => r,
|
||||
_ = tokio::time::sleep(ping_timeout) => Err(Error::Message("Ping timeout".into())),
|
||||
};
|
||||
|
||||
|
|
@ -458,7 +458,12 @@ impl PeeringManager {
|
|||
let pex_message = PeerListMessage { list: peer_list };
|
||||
match self
|
||||
.peer_list_endpoint
|
||||
.call(id, pex_message, PRIO_BACKGROUND)
|
||||
.call(
|
||||
id,
|
||||
pex_message,
|
||||
PRIO_BACKGROUND,
|
||||
RpcInFlightLimiter::NoLimit,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Err(e) => warn!("Error doing peer exchange: {}", e),
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ use std::time::Duration;
|
|||
use futures::future::join_all;
|
||||
use futures::stream::futures_unordered::FuturesUnordered;
|
||||
use futures::stream::StreamExt;
|
||||
use garage_net::endpoint::RpcInFlightLimiter;
|
||||
use tokio::select;
|
||||
|
||||
use opentelemetry::KeyValue;
|
||||
|
|
@ -44,6 +45,8 @@ pub struct RequestStrategy<T> {
|
|||
rs_timeout: Timeout,
|
||||
/// Data to drop when everything completes
|
||||
rs_drop_on_complete: T,
|
||||
/// RPC In Flight Limiter
|
||||
rs_inflight_limiter: RpcInFlightLimiter,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone)]
|
||||
|
|
@ -61,6 +64,7 @@ impl Clone for RequestStrategy<()> {
|
|||
rs_priority: self.rs_priority,
|
||||
rs_timeout: self.rs_timeout,
|
||||
rs_drop_on_complete: (),
|
||||
rs_inflight_limiter: self.rs_inflight_limiter,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -74,6 +78,7 @@ impl RequestStrategy<()> {
|
|||
rs_priority: prio,
|
||||
rs_timeout: Timeout::Default,
|
||||
rs_drop_on_complete: (),
|
||||
rs_inflight_limiter: RpcInFlightLimiter::NoLimit,
|
||||
}
|
||||
}
|
||||
/// Add an item to be dropped on completion
|
||||
|
|
@ -84,6 +89,7 @@ impl RequestStrategy<()> {
|
|||
rs_priority: self.rs_priority,
|
||||
rs_timeout: self.rs_timeout,
|
||||
rs_drop_on_complete: drop_on_complete,
|
||||
rs_inflight_limiter: RpcInFlightLimiter::NoLimit,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -109,6 +115,10 @@ impl<T> RequestStrategy<T> {
|
|||
self.rs_timeout = Timeout::Custom(timeout);
|
||||
self
|
||||
}
|
||||
pub fn with_write_limiter(mut self) -> Self {
|
||||
self.rs_inflight_limiter = RpcInFlightLimiter::TableWrite;
|
||||
self
|
||||
}
|
||||
/// Extract drop_on_complete item
|
||||
fn extract_drop_on_complete(self) -> (RequestStrategy<()>, T) {
|
||||
(
|
||||
|
|
@ -118,6 +128,7 @@ impl<T> RequestStrategy<T> {
|
|||
rs_priority: self.rs_priority,
|
||||
rs_timeout: self.rs_timeout,
|
||||
rs_drop_on_complete: (),
|
||||
rs_inflight_limiter: self.rs_inflight_limiter,
|
||||
},
|
||||
self.rs_drop_on_complete,
|
||||
)
|
||||
|
|
@ -185,7 +196,7 @@ impl RpcHelper {
|
|||
|
||||
let node_id = to.into();
|
||||
let rpc_call = endpoint
|
||||
.call_streaming(&node_id, msg, strat.rs_priority)
|
||||
.call_streaming(&node_id, msg, strat.rs_priority, strat.rs_inflight_limiter)
|
||||
.with_context(Context::current_with_span(span))
|
||||
.record_duration(&self.0.metrics.rpc_duration, &metric_tags);
|
||||
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ use garage_net::{NetApp, NetworkKey, NodeID, NodeKey};
|
|||
|
||||
#[cfg(feature = "kubernetes-discovery")]
|
||||
use garage_util::config::KubernetesDiscoveryConfig;
|
||||
use garage_util::config::{Config, DataDirEnum};
|
||||
use garage_util::config::{Config, DataDirEnum, RpcInFlightLimiterEnum};
|
||||
use garage_util::data::*;
|
||||
use garage_util::error::*;
|
||||
use garage_util::persister::Persister;
|
||||
|
|
@ -256,7 +256,17 @@ impl System {
|
|||
let bind_outgoing_to = Some(config)
|
||||
.filter(|x| x.rpc_bind_outgoing)
|
||||
.map(|x| x.rpc_bind_addr.ip());
|
||||
let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key, bind_outgoing_to);
|
||||
let maybe_max_table_write = match &config.experimental.rpc_in_flight_limiters {
|
||||
RpcInFlightLimiterEnum::None => None,
|
||||
RpcInFlightLimiterEnum::FixedSize(v) => Some(v.max_table_write),
|
||||
};
|
||||
let netapp = NetApp::new(
|
||||
GARAGE_VERSION_TAG,
|
||||
network_key,
|
||||
node_key,
|
||||
bind_outgoing_to,
|
||||
maybe_max_table_write,
|
||||
);
|
||||
let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into());
|
||||
|
||||
// ---- setup netapp public listener and full mesh peering strategy ----
|
||||
|
|
|
|||
|
|
@ -3,10 +3,12 @@ use std::convert::TryInto;
|
|||
use std::sync::Arc;
|
||||
|
||||
use serde_bytes::ByteBuf;
|
||||
use tokio::sync::Notify;
|
||||
use tokio::sync::SemaphorePermit;
|
||||
use tokio::sync::{Notify, Semaphore};
|
||||
|
||||
use garage_db as db;
|
||||
|
||||
use garage_util::config::MerkleBackpressureEnum;
|
||||
use garage_util::data::*;
|
||||
use garage_util::error::*;
|
||||
use garage_util::migrate::Migrate;
|
||||
|
|
@ -20,6 +22,67 @@ use crate::replication::*;
|
|||
use crate::schema::*;
|
||||
use crate::util::*;
|
||||
|
||||
pub(crate) struct MerkleTodo {
|
||||
merkle_todo: db::Tree,
|
||||
merkle_todo_notify: Notify,
|
||||
merkle_todo_bounded_queue: Option<Arc<Semaphore>>,
|
||||
}
|
||||
impl Clone for MerkleTodo {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
merkle_todo: self.merkle_todo.clone(),
|
||||
merkle_todo_notify: Notify::new(),
|
||||
merkle_todo_bounded_queue: self.merkle_todo_bounded_queue.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
impl MerkleTodo {
|
||||
fn new<F: TableSchema>(db: &db::Db, config: &MerkleBackpressureEnum) -> Self {
|
||||
let merkle_todo = db
|
||||
.open_tree(format!("{}:merkle_todo", F::TABLE_NAME))
|
||||
.expect("Unable to open DB Merkle TODO tree");
|
||||
|
||||
let merkle_todo_bounded_queue = match config {
|
||||
MerkleBackpressureEnum::None => None,
|
||||
MerkleBackpressureEnum::FixedQueue(p) => {
|
||||
Some(Arc::new(Semaphore::new(p.max_queue_size)))
|
||||
}
|
||||
};
|
||||
|
||||
Self {
|
||||
merkle_todo,
|
||||
merkle_todo_notify: Notify::new(),
|
||||
merkle_todo_bounded_queue,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn len(&self) -> Result<usize, db::Error> {
|
||||
self.merkle_todo.len()
|
||||
}
|
||||
|
||||
pub(crate) async fn with_db<F: FnOnce(&db::Tree, SemaphorePermit)>(&self, f: F) {
|
||||
let bounded = self
|
||||
.merkle_todo_bounded_queue
|
||||
.clone()
|
||||
.unwrap_or(Arc::new(Semaphore::new(1)));
|
||||
let permit = bounded.acquire().await.unwrap();
|
||||
f(&self.merkle_todo, permit);
|
||||
}
|
||||
|
||||
pub(crate) fn appended(&self, permit: SemaphorePermit) {
|
||||
permit.forget();
|
||||
self.merkle_todo_notify.notify_one();
|
||||
}
|
||||
|
||||
pub(crate) fn processed(&self) {
|
||||
let bounded = self
|
||||
.merkle_todo_bounded_queue
|
||||
.clone()
|
||||
.unwrap_or(Arc::new(Semaphore::new(1)));
|
||||
bounded.add_permits(1);
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TableData<F: TableSchema, R: TableReplication> {
|
||||
system: Arc<System>,
|
||||
|
||||
|
|
@ -29,8 +92,7 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
|
|||
pub store: db::Tree,
|
||||
|
||||
pub(crate) merkle_tree: db::Tree,
|
||||
pub(crate) merkle_todo: db::Tree,
|
||||
pub(crate) merkle_todo_notify: Notify,
|
||||
pub(crate) merkle_todo: MerkleTodo,
|
||||
|
||||
pub(crate) insert_queue: db::Tree,
|
||||
pub(crate) insert_queue_notify: Arc<Notify>,
|
||||
|
|
@ -38,10 +100,18 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
|
|||
pub(crate) gc_todo: db::Tree,
|
||||
|
||||
pub(crate) metrics: TableMetrics,
|
||||
|
||||
pub(crate) config: MerkleBackpressureEnum,
|
||||
}
|
||||
|
||||
impl<F: TableSchema, R: TableReplication> TableData<F, R> {
|
||||
pub fn new(system: Arc<System>, instance: F, replication: R, db: &db::Db) -> Arc<Self> {
|
||||
pub fn new(
|
||||
system: Arc<System>,
|
||||
instance: F,
|
||||
replication: R,
|
||||
db: &db::Db,
|
||||
config: &MerkleBackpressureEnum,
|
||||
) -> Arc<Self> {
|
||||
let store = db
|
||||
.open_tree(format!("{}:table", F::TABLE_NAME))
|
||||
.expect("Unable to open DB tree");
|
||||
|
|
@ -49,9 +119,8 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
|
|||
let merkle_tree = db
|
||||
.open_tree(format!("{}:merkle_tree", F::TABLE_NAME))
|
||||
.expect("Unable to open DB Merkle tree tree");
|
||||
let merkle_todo = db
|
||||
.open_tree(format!("{}:merkle_todo", F::TABLE_NAME))
|
||||
.expect("Unable to open DB Merkle TODO tree");
|
||||
|
||||
let merkle_todo = MerkleTodo::new::<F>(db, config);
|
||||
|
||||
let insert_queue = db
|
||||
.open_tree(format!("{}:insert_queue", F::TABLE_NAME))
|
||||
|
|
@ -76,11 +145,11 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
|
|||
store,
|
||||
merkle_tree,
|
||||
merkle_todo,
|
||||
merkle_todo_notify: Notify::new(),
|
||||
insert_queue,
|
||||
insert_queue_notify: Arc::new(Notify::new()),
|
||||
gc_todo,
|
||||
metrics,
|
||||
config: config.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -167,6 +236,8 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
|
|||
// - When an entry is modified or deleted, add it to the merkle updater's todo list.
|
||||
// This has to be done atomically with the modification for the merkle updater
|
||||
// to maintain consistency. The merkle updater must then be notified with todo_notify.
|
||||
// Also to avoid overloading the merkle updater, you need to sleep a given amount of
|
||||
// time to enable backpressure (ie. slow down clients).
|
||||
// - When an entry is updated to be a tombstone, add it to the gc_todo tree
|
||||
|
||||
pub(crate) fn update_many<T: Borrow<ByteBuf>>(&self, entries: &[T]) -> Result<(), Error> {
|
||||
|
|
@ -201,6 +272,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
|
|||
) -> Result<Option<F::E>, Error> {
|
||||
let tree_key = self.tree_key(partition_key, sort_key);
|
||||
|
||||
// transaction begins
|
||||
let changed = self.store.db().transaction(|tx| {
|
||||
let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, &tree_key)? {
|
||||
Some(old_bytes) => {
|
||||
|
|
@ -238,12 +310,22 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
|
|||
Ok(None)
|
||||
}
|
||||
})?;
|
||||
// transaction ends
|
||||
|
||||
if let Some((new_entry, new_bytes_hash)) = changed {
|
||||
self.metrics.internal_update_counter.add(1);
|
||||
// early return if nothing changed
|
||||
let (new_entry, new_bytes_hash) = match changed {
|
||||
Some((e, b)) => (e, b),
|
||||
None => {
|
||||
let maybe_bound = self.merkle_todo_bounded_queue.clone();
|
||||
if let Some(b) = &maybe_bound {
|
||||
b.add_permits(1);
|
||||
}
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
|
||||
// Handle GC in case of tombstone
|
||||
let is_tombstone = new_entry.is_tombstone();
|
||||
self.merkle_todo_notify.notify_one();
|
||||
if is_tombstone {
|
||||
// We are only responsible for GC'ing this item if we are the
|
||||
// "leader" of the partition, i.e. the first node in the
|
||||
|
|
@ -259,10 +341,13 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
|
|||
}
|
||||
}
|
||||
|
||||
// Collect metrics
|
||||
self.metrics.internal_update_counter.add(1);
|
||||
|
||||
// Synchronize with the Merkle Worker
|
||||
self.merkle_todo_notify.notify_one(); // Wake-up it
|
||||
|
||||
Ok(Some(new_entry))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> {
|
||||
|
|
@ -282,10 +367,16 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
|
|||
_ => Ok(false),
|
||||
})?;
|
||||
|
||||
if removed {
|
||||
if !removed {
|
||||
let maybe_bound = self.merkle_todo_bounded_queue.clone();
|
||||
if let Some(b) = &maybe_bound {
|
||||
b.add_permits(1);
|
||||
}
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
self.metrics.internal_delete_counter.add(1);
|
||||
self.merkle_todo_notify.notify_one();
|
||||
}
|
||||
Ok(removed)
|
||||
}
|
||||
|
||||
|
|
@ -310,11 +401,18 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
|
|||
_ => Ok(false),
|
||||
})?;
|
||||
|
||||
if removed {
|
||||
if !removed {
|
||||
let maybe_bound = self.merkle_todo_bounded_queue.clone();
|
||||
if let Some(b) = &maybe_bound {
|
||||
b.add_permits(1);
|
||||
}
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
self.metrics.internal_delete_counter.add(1);
|
||||
self.merkle_todo_notify.notify_one();
|
||||
}
|
||||
Ok(removed)
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
// ---- Insert queue functions ----
|
||||
|
|
|
|||
|
|
@ -262,7 +262,8 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
|
|||
// GC has been successful for all of these entries.
|
||||
// We now remove them all from our local table and from the GC todo list.
|
||||
for item in items {
|
||||
self.data
|
||||
let _is_removed = self
|
||||
.data
|
||||
.delete_if_equal_hash(&item.key[..], item.value_hash)
|
||||
.err_context("GC: local delete tombstones")?;
|
||||
item.remove_if_equal(&self.data.gc_todo)
|
||||
|
|
@ -275,14 +276,21 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
|
|||
|
||||
impl<F: TableSchema, R: TableReplication> EndpointHandler<GcRpc> for TableGc<F, R> {
|
||||
async fn handle(self: &Arc<Self>, message: &GcRpc, _from: NodeID) -> Result<GcRpc, Error> {
|
||||
let maybe_bounded = self.data.merkle_todo_bounded_queue.clone();
|
||||
match message {
|
||||
GcRpc::Update(items) => {
|
||||
if let Some(b) = maybe_bounded {
|
||||
b.acquire_many(items.len() as u32).await.unwrap().forget();
|
||||
}
|
||||
self.data.update_many(items)?;
|
||||
Ok(GcRpc::Ok)
|
||||
}
|
||||
GcRpc::DeleteIfEqualHash(items) => {
|
||||
if let Some(b) = maybe_bounded {
|
||||
b.acquire_many(items.len() as u32).await.unwrap().forget();
|
||||
}
|
||||
for (key, vhash) in items.iter() {
|
||||
self.data.delete_if_equal_hash(&key[..], *vhash)?;
|
||||
let _is_removed = self.data.delete_if_equal_hash(&key[..], *vhash)?;
|
||||
}
|
||||
Ok(GcRpc::Ok)
|
||||
}
|
||||
|
|
@ -329,7 +337,6 @@ impl<F: TableSchema, R: TableReplication> Worker for GcWorker<F, R> {
|
|||
}
|
||||
|
||||
async fn wait_for_work(&mut self) -> WorkerState {
|
||||
tokio::time::sleep(self.wait_delay).await;
|
||||
WorkerState::Busy
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ use tokio::sync::watch;
|
|||
use garage_db as db;
|
||||
|
||||
use garage_util::background::*;
|
||||
use garage_util::config::MerkleBackpressureEnum;
|
||||
use garage_util::data::*;
|
||||
use garage_util::encode::{nonversioned_decode, nonversioned_encode};
|
||||
use garage_util::error::Error;
|
||||
|
|
@ -70,6 +71,15 @@ impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> {
|
|||
pub(crate) fn new(data: Arc<TableData<F, R>>) -> Arc<Self> {
|
||||
let empty_node_hash = blake2sum(&nonversioned_encode(&MerkleNode::Empty).unwrap()[..]);
|
||||
|
||||
// @FIXME: move in worker
|
||||
match &data.config {
|
||||
MerkleBackpressureEnum::None => info!("Merkle Backpressure is not activated"),
|
||||
MerkleBackpressureEnum::FixedQueue(v) => info!(
|
||||
"Merkle backpressure with a fixed queue size (qlen={}) is activated.",
|
||||
v.max_queue_size
|
||||
),
|
||||
}
|
||||
|
||||
Arc::new(Self {
|
||||
data,
|
||||
empty_node_hash,
|
||||
|
|
@ -125,6 +135,11 @@ impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> {
|
|||
k
|
||||
);
|
||||
}
|
||||
|
||||
let maybe_bound = self.data.merkle_todo_bounded_queue.clone();
|
||||
if let Some(b) = &maybe_bound {
|
||||
b.add_permits(1);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,12 +1,16 @@
|
|||
use opentelemetry::{global, metrics::*, KeyValue};
|
||||
use std::convert::TryInto;
|
||||
|
||||
use garage_db as db;
|
||||
|
||||
use crate::data::MerkleTodo;
|
||||
|
||||
/// TableMetrics reference all counter used for metrics
|
||||
pub struct TableMetrics {
|
||||
pub(crate) _table_size: ValueObserver<u64>,
|
||||
pub(crate) _merkle_tree_size: ValueObserver<u64>,
|
||||
pub(crate) _merkle_todo_len: ValueObserver<u64>,
|
||||
pub(crate) _merkle_todo_bounded_queue_free: ValueObserver<u64>,
|
||||
pub(crate) _gc_todo_len: ValueObserver<u64>,
|
||||
|
||||
pub(crate) get_request_counter: BoundCounter<u64>,
|
||||
|
|
@ -25,7 +29,7 @@ impl TableMetrics {
|
|||
table_name: &'static str,
|
||||
store: db::Tree,
|
||||
merkle_tree: db::Tree,
|
||||
merkle_todo: db::Tree,
|
||||
merkle_todo: MerkleTodo,
|
||||
gc_todo: db::Tree,
|
||||
) -> Self {
|
||||
let meter = global::meter(table_name);
|
||||
|
|
@ -72,6 +76,20 @@ impl TableMetrics {
|
|||
)
|
||||
.with_description("Merkle tree updater TODO queue length")
|
||||
.init(),
|
||||
_merkle_todo_bounded_queue_free: meter
|
||||
.u64_value_observer(
|
||||
"table.merkle_todo_bounded_queue_free",
|
||||
move |observer| {
|
||||
let maybe_bounded = merkle_todo_bounded_queue.clone();
|
||||
let free: u64 = match &maybe_bounded {
|
||||
Some(v) => v.available_permits().try_into().unwrap(),
|
||||
None => 0,
|
||||
};
|
||||
observer.observe(free, &[KeyValue::new("table_name", table_name)])
|
||||
}
|
||||
)
|
||||
.with_description("Merkle TODO queue free slots")
|
||||
.init(),
|
||||
_gc_todo_len: meter
|
||||
.u64_value_observer(
|
||||
"table.gc_todo_queue_length",
|
||||
|
|
|
|||
|
|
@ -244,8 +244,14 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
|
|||
|
||||
// All remote nodes have written those items, now we can delete them locally
|
||||
let mut not_removed = 0;
|
||||
let maybe_bounded = self.data.merkle_todo_bounded_queue.clone();
|
||||
if let Some(b) = maybe_bounded {
|
||||
b.acquire_many(items.len() as u32).await.unwrap().forget();
|
||||
}
|
||||
|
||||
for (k, v) in items.iter() {
|
||||
if !self.data.delete_if_equal(&k[..], &v[..])? {
|
||||
let removed = self.data.delete_if_equal(&k[..], &v[..])?;
|
||||
if !removed {
|
||||
not_removed += 1;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ use opentelemetry::{
|
|||
use garage_db as db;
|
||||
|
||||
use garage_util::background::BackgroundRunner;
|
||||
use garage_util::config::MerkleBackpressureEnum;
|
||||
use garage_util::data::*;
|
||||
use garage_util::error::Error;
|
||||
use garage_util::metrics::RecordDuration;
|
||||
|
|
@ -68,12 +69,18 @@ impl<F: TableSchema> Rpc for TableRpc<F> {
|
|||
impl<F: TableSchema, R: TableReplication> Table<F, R> {
|
||||
// =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) ===============
|
||||
|
||||
pub fn new(instance: F, replication: R, system: Arc<System>, db: &db::Db) -> Arc<Self> {
|
||||
pub fn new(
|
||||
instance: F,
|
||||
replication: R,
|
||||
system: Arc<System>,
|
||||
db: &db::Db,
|
||||
config: &MerkleBackpressureEnum,
|
||||
) -> Arc<Self> {
|
||||
let endpoint = system
|
||||
.netapp
|
||||
.endpoint(format!("garage_table/table.rs/Rpc:{}", F::TABLE_NAME));
|
||||
|
||||
let data = TableData::new(system.clone(), instance, replication, db);
|
||||
let data = TableData::new(system.clone(), instance, replication, db, config);
|
||||
|
||||
let merkle_updater = MerkleUpdater::new(data.clone());
|
||||
|
||||
|
|
@ -131,7 +138,8 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
|
|||
who.as_ref(),
|
||||
rpc,
|
||||
RequestStrategy::with_priority(PRIO_NORMAL)
|
||||
.with_quorum(self.data.replication.write_quorum()),
|
||||
.with_quorum(self.data.replication.write_quorum())
|
||||
.with_write_limiter(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
|
@ -527,6 +535,10 @@ impl<F: TableSchema, R: TableReplication> EndpointHandler<TableRpc<F>> for Table
|
|||
Ok(TableRpc::Update(values))
|
||||
}
|
||||
TableRpc::Update(pairs) => {
|
||||
let maybe_bounded = self.data.merkle_todo_bounded_queue.clone();
|
||||
if let Some(b) = maybe_bounded {
|
||||
b.acquire_many(pairs.len() as u32).await.unwrap().forget();
|
||||
}
|
||||
self.data.update_many(pairs)?;
|
||||
Ok(TableRpc::Ok)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -135,6 +135,10 @@ pub struct Config {
|
|||
/// Configuration for the admin API endpoint
|
||||
#[serde(default = "Default::default")]
|
||||
pub admin: AdminConfig,
|
||||
|
||||
/// --- Experimental
|
||||
#[serde(default = "Default::default")]
|
||||
pub experimental: ExperimentalConfig,
|
||||
}
|
||||
|
||||
/// Value for data_dir: either a single directory or a list of dirs with attributes
|
||||
|
|
@ -255,6 +259,40 @@ pub struct KubernetesDiscoveryConfig {
|
|||
pub skip_crd: bool,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Clone, Default)]
|
||||
pub struct ExperimentalConfig {
|
||||
pub merkle_backpressure: MerkleBackpressureEnum,
|
||||
pub rpc_in_flight_limiters: RpcInFlightLimiterEnum,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Clone, Default)]
|
||||
#[serde(rename_all = "lowercase", tag = "kind")]
|
||||
pub enum MerkleBackpressureEnum {
|
||||
#[default]
|
||||
None,
|
||||
FixedQueue(MerkleFixedQueue),
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Clone, Default)]
|
||||
#[serde(rename_all = "lowercase", tag = "kind")]
|
||||
pub enum RpcInFlightLimiterEnum {
|
||||
#[default]
|
||||
None,
|
||||
FixedSize(InFlightFixedSize),
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Clone, Default)]
|
||||
pub struct InFlightFixedSize {
|
||||
#[serde(default = "default_max_table_write")]
|
||||
pub max_table_write: usize,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Clone, Default)]
|
||||
pub struct MerkleFixedQueue {
|
||||
#[serde(default = "default_max_queue_size")]
|
||||
pub max_queue_size: usize,
|
||||
}
|
||||
|
||||
/// Read and parse configuration
|
||||
pub fn read_config(config_file: PathBuf) -> Result<Config, Error> {
|
||||
let config = std::fs::read_to_string(config_file)?;
|
||||
|
|
@ -281,6 +319,14 @@ fn default_compression() -> Option<i32> {
|
|||
Some(1)
|
||||
}
|
||||
|
||||
fn default_max_table_write() -> usize {
|
||||
64
|
||||
}
|
||||
|
||||
fn default_max_queue_size() -> usize {
|
||||
256
|
||||
}
|
||||
|
||||
fn deserialize_compression<'de, D>(deserializer: D) -> Result<Option<i32>, D::Error>
|
||||
where
|
||||
D: de::Deserializer<'de>,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue