More work on Dirsync request sending

Signed-off-by: Luca Fulchir <luca.fulchir@runesauth.com>
This commit is contained in:
Luca Fulchir 2023-06-05 09:18:32 +02:00
parent 9634fbba31
commit 289c6c318e
Signed by: luca.fulchir
GPG Key ID: 8F6440603D13A78E
10 changed files with 302 additions and 117 deletions

View File

@ -16,9 +16,17 @@ impl From<[u8; 16]> for UserID {
impl UserID {
/// New random user id
pub fn new(rand: &Random) -> Self {
let mut ret = Self([0; 16]);
rand.fill(&mut ret.0);
ret
use ::core::mem::MaybeUninit;
let mut out: MaybeUninit<[u8; 16]> = MaybeUninit::uninit();
#[allow(unsafe_code)]
unsafe {
let _ = rand.fill(out.assume_init_mut());
Self(out.assume_init())
}
}
/// Anonymous user id
pub fn new_anonymous() -> Self {
UserID([0; 16])
}
/// length of the User ID in bytes
pub const fn len() -> usize {
@ -31,6 +39,16 @@ impl UserID {
pub struct Token([u8; 32]);
impl Token {
/// New random token, anonymous should not check this anyway
pub fn new_anonymous(rand: &Random) -> Self {
use ::core::mem::MaybeUninit;
let mut out: MaybeUninit<[u8; 32]> = MaybeUninit::uninit();
#[allow(unsafe_code)]
unsafe {
let _ = rand.fill(out.assume_init_mut());
Self(out.assume_init())
}
}
/// length of the token in bytes
pub const fn len() -> usize {
32
@ -68,7 +86,7 @@ pub type TokenChecker =
/// further limit to a "safe" subset of utf8
// SECURITY: TODO: limit to a subset of utf8
#[derive(Debug, Clone, PartialEq)]
pub struct Domain(String);
pub struct Domain(pub String);
impl TryFrom<&[u8]> for Domain {
type Error = ();

View File

@ -3,11 +3,15 @@
pub mod dirsync;
use crate::{
connection::{self, ProtocolVersion},
enc::sym::{HeadLen, TagLen},
auth::ServiceID,
connection::{self, Connection, IDRecv, ProtocolVersion},
enc::{
asym::{KeyID, PrivKey, PubKey},
sym::{HeadLen, TagLen},
},
};
use ::num_traits::FromPrimitive;
use ::std::rc::Rc;
use ::std::{collections::VecDeque, rc::Rc};
/// Handshake errors
#[derive(::thiserror::Error, Debug, Copy, Clone)]
@ -32,6 +36,9 @@ pub enum Error {
/// Could not generate Keys
#[error("Key generation failed")]
KeyGeneration,
/// Too many client handshakes currently running
#[error("Too many client handshakes")]
TooManyClientHandshakes,
}
/// List of possible handshakes
@ -66,20 +73,108 @@ impl TryFrom<&str> for HandshakeID {
}
pub(crate) struct HandshakeServer {
pub id: crate::enc::asym::KeyID,
pub key: crate::enc::asym::PrivKey,
pub id: KeyID,
pub key: PrivKey,
}
#[derive(Clone)]
pub(crate) struct HandshakeClient {
pub id: crate::enc::asym::KeyID,
pub key: crate::enc::asym::PrivKey,
pub service_id: crate::auth::ServiceID,
pub service_conn_id: connection::IDRecv,
pub connection: Rc<crate::connection::Connection>,
pub id: KeyID,
pub key: PrivKey,
pub service_id: ServiceID,
pub service_conn_id: IDRecv,
pub connection: Connection,
pub timeout: Rc<u32>,
}
/// Tracks the keys used by the client and the handshake
/// they are associated with
pub(crate) struct HandshakeClientList {
used: Vec<::bitmaps::Bitmap<1024>>, // index = KeyID
keys: Vec<Option<(PrivKey, PubKey)>>,
list: Vec<Option<HandshakeClient>>,
}
impl HandshakeClientList {
pub(crate) fn new() -> Self {
Self {
used: [::bitmaps::Bitmap::<1024>::new()].to_vec(),
keys: Vec::with_capacity(16),
list: Vec::with_capacity(16),
}
}
pub(crate) fn get(&self, id: KeyID) -> Option<&HandshakeClient> {
if id.0 as usize >= self.list.len() {
return None;
}
self.list[id.0 as usize].as_ref()
}
pub(crate) fn remove(&mut self, id: KeyID) -> Option<HandshakeClient> {
if id.0 as usize >= self.list.len() {
return None;
}
let used_vec_idx = id.0 as usize / 1024;
let used_bitmap_idx = id.0 as usize % 1024;
let used_iter = match self.used.get_mut(used_vec_idx) {
Some(used_iter) => used_iter,
None => return None,
};
used_iter.set(used_bitmap_idx, false);
self.keys[id.0 as usize] = None;
let mut owned = None;
::core::mem::swap(&mut self.list[id.0 as usize], &mut owned);
owned
}
pub(crate) fn add(
&mut self,
priv_key: PrivKey,
pub_key: PubKey,
service_id: ServiceID,
service_conn_id: IDRecv,
connection: Connection,
) -> Result<(KeyID, &HandshakeClient), ()> {
let maybe_free_key_idx =
self.used.iter().enumerate().find_map(|(idx, bmap)| {
match bmap.first_false_index() {
Some(false_idx) => Some(((idx * 1024), false_idx)),
None => None,
}
});
let free_key_idx = match maybe_free_key_idx {
Some((idx, false_idx)) => {
let free_key_idx = idx * 1024 + false_idx;
if free_key_idx > KeyID::MAX as usize {
return Err(());
}
self.used[idx].set(false_idx, true);
free_key_idx
}
None => {
let mut bmap = ::bitmaps::Bitmap::<1024>::new();
bmap.set(0, true);
self.used.push(bmap);
self.used.len() * 1024
}
};
if self.keys.len() >= free_key_idx {
self.keys.push(None);
self.list.push(None);
}
self.keys[free_key_idx] = Some((priv_key.clone(), pub_key));
self.list[free_key_idx] = Some(HandshakeClient {
id: KeyID(free_key_idx as u16),
key: priv_key,
service_id,
service_conn_id,
connection,
timeout: Rc::new(0),
});
Ok((
KeyID(free_key_idx as u16),
self.list[free_key_idx].as_ref().unwrap(),
))
}
}
/// Parsed handshake
#[derive(Debug, Clone)]
pub enum HandshakeData {

View File

@ -132,10 +132,7 @@ impl ConnList {
}
/// Only *Reserve* a connection,
/// without actually tracking it in self.connections
pub(crate) fn reserve_first(
&mut self,
mut conn: Connection,
) -> Rc<Connection> {
pub(crate) fn reserve_first(&mut self) -> IDRecv {
// uhm... bad things are going on here:
// * id must be initialized, but only because:
// * rust does not understand that after the `!found` id is always
@ -173,9 +170,7 @@ impl ConnList {
let actual_id = ((id_in_thread as u64) * (self.thread_id.total as u64))
+ (self.thread_id.id as u64);
let new_id = IDRecv(ID::new_u64(actual_id));
conn.id_recv = new_id;
// Return the new connection without tracking it
Rc::new(conn)
new_id
}
/// NOTE: does NOT check if the connection has been previously reserved!
pub(crate) fn track(&mut self, conn: Rc<Connection>) -> Result<(), ()> {

View File

@ -7,6 +7,8 @@ use ::trust_dns_resolver::TokioAsyncResolver;
pub mod record;
pub use record::Record;
use crate::auth::Domain;
/// Common errors for Dnssec setup and usage
#[derive(::thiserror::Error, Debug)]
pub enum Error {
@ -88,10 +90,10 @@ impl Dnssec {
}
const TXT_RECORD_START: &str = "v=Fenrir1 ";
/// Get the fenrir data for a domain
pub async fn resolv(&self, domain: &str) -> ::std::io::Result<String> {
pub async fn resolv(&self, domain: &Domain) -> ::std::io::Result<String> {
use ::trust_dns_client::rr::Name;
let fqdn_str = "_fenrir.".to_owned() + domain;
let fqdn_str = "_fenrir.".to_owned() + &domain.0;
::tracing::debug!("Resolving: {}", fqdn_str);
let fqdn = Name::from_utf8(&fqdn_str)?;
let answers = self.resolver.txt_lookup(fqdn).await?;

View File

@ -17,6 +17,8 @@ impl KeyID {
pub const fn len() -> usize {
2
}
/// Maximum possible KeyID
pub const MAX: u16 = u16::MAX;
/// Serialize into raw bytes
pub fn serialize(&self, out: &mut [u8; KeyID::len()]) {
out.copy_from_slice(&self.0.to_le_bytes());

View File

@ -51,6 +51,12 @@ impl Hkdf {
Hkdf::Sha3(sha3) => sha3.get_secret(context),
}
}
/// get the kind of this Hkdf
pub fn kind(&self) -> HkdfKind {
match self {
Hkdf::Sha3(_) => HkdfKind::Sha3,
}
}
}
// Hack & tricks:

View File

@ -39,7 +39,7 @@ impl CipherKind {
/// Additional Authenticated Data
#[derive(Debug)]
pub struct AAD<'a>(pub &'a mut [u8]);
pub struct AAD<'a>(pub &'a [u8]);
/// Cipher direction, to make sure we don't reuse the same cipher
/// for both decrypting and encrypting

View File

@ -5,20 +5,24 @@
pub(crate) mod worker;
use crate::{
auth,
auth::ServiceID,
connection::{
self,
handshake::{self, Handshake, HandshakeClient, HandshakeServer},
Connection,
handshake::{
self, Handshake, HandshakeClient, HandshakeClientList,
HandshakeServer,
},
Connection, IDRecv,
},
enc::{
self, asym,
self,
asym::{self, KeyID, PrivKey, PubKey},
hkdf::{Hkdf, HkdfKind},
sym::{CipherKind, CipherRecv},
},
Error,
};
use ::std::{rc::Rc, vec::Vec};
use ::std::vec::Vec;
/// Information needed to reply after the key exchange
#[derive(Debug, Clone)]
@ -35,13 +39,13 @@ pub(crate) struct AuthNeededInfo {
#[derive(Debug)]
pub(crate) struct ClientConnectInfo {
/// The service ID that we are connecting to
pub service_id: auth::ServiceID,
pub service_id: ServiceID,
/// The service ID that we are connecting to
pub service_connection_id: connection::IDRecv,
pub service_connection_id: IDRecv,
/// Parsed handshake packet
pub handshake: Handshake,
/// Connection
pub connection: Rc<Connection>,
pub connection: Connection,
}
/// Intermediate actions to be taken while parsing the handshake
#[derive(Debug)]
@ -63,11 +67,11 @@ pub(crate) struct ThreadTracker {
pub id: u16,
}
/// Async free but thread safe tracking of handhsakes and conenctions
/// Tracking of handhsakes and conenctions
/// Note that we have multiple Handshake trackers, pinned to different cores
/// Each of them will handle a subset of all handshakes.
/// Each handshake is routed to a different tracker with:
/// (udp_src_sender_port % total_threads) - 1
/// Each handshake is routed to a different tracker by checking
/// core = (udp_src_sender_port % total_threads) - 1
pub(crate) struct HandshakeTracker {
thread_id: ThreadTracker,
key_exchanges: Vec<(asym::KeyKind, asym::KeyExchangeKind)>,
@ -75,12 +79,8 @@ pub(crate) struct HandshakeTracker {
/// ephemeral keys used server side in key exchange
keys_srv: Vec<HandshakeServer>,
/// ephemeral keys used client side in key exchange
hshake_cli: Vec<HandshakeClient>,
hshake_cli: HandshakeClientList,
}
#[allow(unsafe_code)]
unsafe impl Send for HandshakeTracker {}
#[allow(unsafe_code)]
unsafe impl Sync for HandshakeTracker {}
impl HandshakeTracker {
pub(crate) fn new(thread_id: ThreadTracker) -> Self {
@ -89,9 +89,25 @@ impl HandshakeTracker {
ciphers: Vec::new(),
key_exchanges: Vec::new(),
keys_srv: Vec::new(),
hshake_cli: Vec::new(),
hshake_cli: HandshakeClientList::new(),
}
}
pub(crate) fn new_client(
&mut self,
priv_key: PrivKey,
pub_key: PubKey,
service_id: ServiceID,
service_conn_id: IDRecv,
connection: Connection,
) -> Result<(KeyID, &HandshakeClient), ()> {
self.hshake_cli.add(
priv_key,
pub_key,
service_id,
service_conn_id,
connection,
)
}
pub(crate) fn recv_handshake(
&mut self,
mut handshake: Handshake,
@ -105,7 +121,6 @@ impl HandshakeTracker {
if let Some(h_k) =
self.keys_srv.iter().find(|k| k.id == req.key_id)
{
use enc::asym::PrivKey;
// Directory synchronized can only use keys
// for key exchange, not signing keys
if let PrivKey::Exchange(k) = &h_k.key {
@ -175,20 +190,9 @@ impl HandshakeTracker {
}));
}
DirSync::Resp(resp) => {
let hshake_idx = {
match self
.hshake_cli
.iter()
.position(|h| h.id == resp.client_key_id)
{
Some(h) => Some(h.clone()),
None => None,
}
};
let hshake_idx = {
if let Some(real_idx) = hshake_idx {
real_idx
} else {
let hshake = match self.hshake_cli.get(resp.client_key_id) {
Some(hshake) => hshake,
None => {
::tracing::debug!(
"No such client key id: {:?}",
resp.client_key_id
@ -196,7 +200,6 @@ impl HandshakeTracker {
return Err(handshake::Error::UnknownKeyID.into());
}
};
let hshake = &self.hshake_cli[hshake_idx];
let cipher_recv = &hshake.connection.cipher_recv;
use crate::enc::sym::AAD;
// no aad for now
@ -212,14 +215,8 @@ impl HandshakeTracker {
return Err(handshake::Error::Key(e).into());
}
}
// we can remove the handshake from the list
let hshake: HandshakeClient = {
let len = self.hshake_cli.len();
if (hshake_idx + 1) != len {
self.hshake_cli.swap(hshake_idx, len - 1);
}
self.hshake_cli.pop().unwrap()
};
let hshake =
self.hshake_cli.remove(resp.client_key_id).unwrap();
return Ok(HandshakeAction::ClientConnect(
ClientConnectInfo {
service_id: hshake.service_id,

View File

@ -1,6 +1,6 @@
//! Worker thread implementation
use crate::{
auth::{ServiceID, TokenChecker},
auth::{Domain, ServiceID, Token, TokenChecker, UserID},
config::Config,
connection::{
self,
@ -14,10 +14,9 @@ use crate::{
},
dnssec,
enc::{
asym::{self, PrivKey, PubKey},
asym::{PrivKey, PubKey},
hkdf::{self, Hkdf, HkdfKind},
sym::{self},
Random, Secret,
sym, Random, Secret,
},
inner::{HandshakeAction, HandshakeTracker, ThreadTracker},
};
@ -36,17 +35,19 @@ pub(crate) struct RawUdp {
pub packet: Packet,
}
pub(crate) struct ConnectInfo {
pub answer: oneshot::Sender<Result<(PubKey, IDSend), crate::Error>>,
pub resolved: dnssec::Record,
pub service_id: ServiceID,
pub domain: Domain,
// TODO: UserID, Token information
}
pub(crate) enum Work {
/// ask the thread to report to the main thread the total number of
/// connections present
CountConnections(oneshot::Sender<usize>),
Connect(
(
oneshot::Sender<Result<(PubKey, IDSend), crate::Error>>,
dnssec::Record,
ServiceID,
),
),
Connect(ConnectInfo),
Recv(RawUdp),
}
pub(crate) enum WorkAnswer {
@ -162,13 +163,13 @@ impl Worker {
let conn_num = self.connections.len();
let _ = sender.send(conn_num);
}
Work::Connect((send_res, dnssec_record, _service_id)) => {
Work::Connect(conn_info) => {
// PERF: geolocation
// Find the first destination with a coherent
// pubkey/key exchange
let destination =
dnssec_record.addresses.iter().find_map(|addr| {
conn_info.resolved.addresses.iter().find_map(|addr| {
if addr
.handshake_ids
.iter()
@ -187,7 +188,8 @@ impl Worker {
// *we* support
for idx in addr.public_key_idx.iter() {
let key_supported_k_x =
dnssec_record.public_keys[idx.0 as usize]
conn_info.resolved.public_keys
[idx.0 as usize]
.1
.kind()
.key_exchanges();
@ -200,7 +202,7 @@ impl Worker {
Some(exchange) => {
return Some((
addr,
dnssec_record.public_keys
conn_info.resolved.public_keys
[idx.0 as usize],
exchange.clone(),
))
@ -214,7 +216,9 @@ impl Worker {
Some((addr, key, exchange)) => (addr, key, exchange),
None => {
let _ =
send_res.send(Err(crate::Error::Resolution(
conn_info
.answer
.send(Err(crate::Error::Resolution(
"No selectable address and key combination"
.to_owned(),
)));
@ -223,11 +227,11 @@ impl Worker {
};
let hkdf_selected = match hkdf::client_select_hkdf(
&self.cfg,
&dnssec_record.hkdfs,
&conn_info.resolved.hkdfs,
) {
Some(hkdf_selected) => hkdf_selected,
None => {
let _ = send_res.send(Err(
let _ = conn_info.answer.send(Err(
handshake::Error::Negotiation.into(),
));
continue 'mainloop;
@ -235,23 +239,24 @@ impl Worker {
};
let cipher_selected = match sym::client_select_cipher(
&self.cfg,
&dnssec_record.ciphers,
&conn_info.resolved.ciphers,
) {
Some(cipher_selected) => cipher_selected,
None => {
let _ = send_res.send(Err(
let _ = conn_info.answer.send(Err(
handshake::Error::Negotiation.into(),
));
continue 'mainloop;
}
};
// FIXME: save KeyID
let (priv_key, pub_key) =
match exchange.new_keypair(&self.rand) {
Ok(pair) => pair,
Err(_) => {
::tracing::error!("Failed to generate keys");
let _ = send_res.send(Err(
let _ = conn_info.answer.send(Err(
handshake::Error::KeyGeneration.into(),
));
continue 'mainloop;
@ -266,7 +271,7 @@ impl Worker {
::tracing::warn!(
"Could not run the key exchange"
);
let _ = send_res.send(Err(
let _ = conn_info.answer.send(Err(
handshake::Error::Negotiation.into(),
));
continue 'mainloop;
@ -279,25 +284,79 @@ impl Worker {
// are PubKey::Exchange
unreachable!()
}
let mut conn = Connection::new(
hkdf,
cipher_selected,
connection::Role::Client,
&self.rand,
);
let auth_recv_id = self.connections.reserve_first();
let service_conn_id = self.connections.reserve_first();
conn.id_recv = auth_recv_id;
let (client_key_id, hshake) = match self
.handshakes
.new_client(
PrivKey::Exchange(priv_key),
PubKey::Exchange(pub_key),
conn_info.service_id,
service_conn_id,
conn,
) {
Ok((client_key_id, hshake)) => (client_key_id, hshake),
Err(_) => {
::tracing::warn!("Too many client handshakes");
let _ = conn_info.answer.send(Err(
handshake::Error::TooManyClientHandshakes
.into(),
));
continue 'mainloop;
}
};
// build request
/*
let auth_info = dirsync::AuthInfo {
user: UserID::new_anonymous(),
token: Token::new_anonymous(&self.rand),
service_id: conn_info.service_id,
domain: conn_info.domain,
};
let req_data = dirsync::ReqData {
nonce: dirsync::Nonce::new(&self.rand),
client_key_id:
client_key_id,
id: auth_recv_id.0,
auth: auth_info,
};
let req = dirsync::Req {
key_id: key.0,
exchange,
hkdf,
cipher,
exchange_key: client_pub_key,
data: 42,
hkdf: hkdf_selected,
cipher: cipher_selected,
exchange_key: pub_key,
data: dirsync::ReqInner::ClearText(req_data),
};
*/
let mut raw = Vec::<u8>::with_capacity(req.len());
req.serialize(
cipher_selected.nonce_len(),
cipher_selected.tag_len(),
&mut raw[..],
);
// encrypt
let encrypt_start = req.encrypted_offset();
let encrypt_end = encrypt_start + req.encrypted_length();
if let Err(e) = hshake.connection.cipher_send.encrypt(
sym::AAD(&[]),
&mut raw[encrypt_start..encrypt_end],
) {
::tracing::error!("Can't encrypt DirSync Request");
let _ = conn_info.answer.send(Err(e.into()));
continue 'mainloop;
}
// start timeout
// send packet
// send packeti
//self.send_packet(raw,
todo!()
}
@ -395,15 +454,16 @@ impl Worker {
let head_len = req.cipher.nonce_len();
let tag_len = req.cipher.tag_len();
let mut raw_conn = Connection::new(
let mut auth_conn = Connection::new(
authinfo.hkdf,
req.cipher,
connection::Role::Server,
&self.rand,
);
raw_conn.id_send = IDSend(req_data.id);
auth_conn.id_send = IDSend(req_data.id);
// track connection
let auth_conn = self.connections.reserve_first(raw_conn);
let auth_id_recv = self.connections.reserve_first();
auth_conn.id_recv = auth_id_recv;
let resp_data = dirsync::RespData {
client_nonce: req_data.nonce,
@ -444,7 +504,7 @@ impl Worker {
self.send_packet(raw_out, udp.src, udp.dst).await;
return;
}
HandshakeAction::ClientConnect(mut cci) => {
HandshakeAction::ClientConnect(cci) => {
let ds_resp;
if let HandshakeData::DirSync(DirSync::Resp(resp)) =
cci.handshake.data
@ -465,17 +525,17 @@ impl Worker {
);
return;
}
{
let conn = Rc::get_mut(&mut cci.connection).unwrap();
conn.id_send = IDSend(resp_data.id);
}
let mut conn = cci.connection;
conn.id_send = IDSend(resp_data.id);
let id_recv = conn.id_recv;
let cipher = conn.cipher_recv.kind();
// track the connection to the authentication server
if self.connections.track(cci.connection.clone()).is_err() {
self.connections.delete(cci.connection.id_recv);
if self.connections.track(Rc::new(conn)).is_err() {
::tracing::error!("Could not track new connection");
self.connections.delete(id_recv);
return;
}
if cci.connection.id_recv.0
== resp_data.service_connection_id
{
if id_recv.0 == resp_data.service_connection_id {
// the user asked a single connection
// to the authentication server, without any additional
// service. No more connections to setup
@ -492,7 +552,7 @@ impl Worker {
);
let mut service_connection = Connection::new(
hkdf,
cci.connection.cipher_recv.kind(),
cipher,
connection::Role::Client,
&self.rand,
);

View File

@ -22,17 +22,16 @@ mod inner;
use ::std::{sync::Arc, vec::Vec};
use ::tokio::{net::UdpSocket, sync::Mutex};
use auth::ServiceID;
use crate::{
auth::TokenChecker,
auth::{Domain, ServiceID, TokenChecker},
connection::{
handshake,
socket::{SocketList, UdpClient, UdpServer},
AuthServerConnections, Packet,
},
inner::{
worker::{RawUdp, Work, Worker},
worker::{ConnectInfo, RawUdp, Work, Worker},
ThreadTracker,
},
};
@ -62,6 +61,9 @@ pub enum Error {
/// Resolution problems. wrong or incomplete DNSSEC data
#[error("DNSSEC resolution: {0}")]
Resolution(String),
/// Wrapper on encryption errors
#[error("Encrypt: {0}")]
Encrypt(enc::Error),
}
/// Instance of a fenrir endpoint
@ -232,7 +234,7 @@ impl Fenrir {
Ok(())
}
/// Get the raw TXT record of a Fenrir domain
pub async fn resolv_txt(&self, domain: &str) -> Result<String, Error> {
pub async fn resolv_txt(&self, domain: &Domain) -> Result<String, Error> {
match &self.dnssec {
Some(dnssec) => Ok(dnssec.resolv(domain).await?),
None => Err(Error::NotInitialized),
@ -240,7 +242,10 @@ impl Fenrir {
}
/// Get the raw TXT record of a Fenrir domain
pub async fn resolv(&self, domain: &str) -> Result<dnssec::Record, Error> {
pub async fn resolv(
&self,
domain: &Domain,
) -> Result<dnssec::Record, Error> {
let record_str = self.resolv_txt(domain).await?;
Ok(dnssec::Dnssec::parse_txt_record(&record_str)?)
}
@ -248,7 +253,7 @@ impl Fenrir {
/// Connect to a service
pub async fn connect(
&self,
domain: &str,
domain: &Domain,
service: ServiceID,
) -> Result<(), Error> {
let resolved = self.resolv(domain).await?;
@ -310,7 +315,12 @@ impl Fenrir {
// and tell that thread to connect somewhere
let (send, recv) = ::tokio::sync::oneshot::channel();
let _ = self._thread_work[thread_idx]
.send(Work::Connect((send, resolved.clone(), service)))
.send(Work::Connect(ConnectInfo {
answer: send,
resolved: resolved.clone(),
service_id: service,
domain: domain.clone(),
}))
.await;
match recv.await {