1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
//! Background connection management
//!
//! For now we will use pure rust TCP connection for transporting the messages.
//! In the future we will switch to QUIC or other protocol with Elixir for
//! better performance, scalability and security.

use std::{
    collections::{hash_map, HashMap},
    net::SocketAddr,
    pin::pin,
    time::Duration,
};

use color_eyre::Result;
use futures::{
    future::{select, Either},
    stream::SelectAll,
    SinkExt, StreamExt,
};
use kanal::AsyncReceiver;
use tokio::{
    net::{TcpListener, TcpStream},
    select,
};
use tracing::{debug, info, trace, warn};
use uuid7::uuid7;

use crate::{
    codec::adapt,
    consts::DEFAULT_CHANNEL_SIZE,
    model::Envelope,
    tasks::{ContextRef, Inbound, Outbound},
    util::ok_or_continue,
    Message,
};

/// Aggregate all inbound data and dispatch them to corresponding handler.
pub(super) async fn inbound_task(recv: AsyncReceiver<Inbound>, ctx: ContextRef) -> Result<()> {
    let mut streams = SelectAll::<Inbound>::new();
    loop {
        if ctx.cancel_token.is_cancelled() {
            break;
        }

        let msg = select! {
            stream = recv.recv() => {
                streams.push(stream?);
                continue
            },
            msg = streams.next() => {
                msg
            }
            _ = ctx.cancel_token.cancelled() => break,
        };

        match msg {
            Some(Ok(msg)) => {
                let Envelope { topic, body, .. } = msg;
                let Some(handle) = ctx.topics.get(&topic) else {
                        info!(
                            target: "inbound",
                            message_type = "swim",
                            topic,
                            "Non-exist topic, ignore",
                        );
                        continue
                    };

                match body {
                    Message::Swim(bytes) => {
                        trace!(target: "inbound", message_type = "swim", ?bytes, "Received swim data");

                        ok_or_continue!("inbound", handle.value().swim.send_external(bytes).await);
                    }
                    Message::RequestSnapshot => {
                        trace!(target: "inbound", message_type = "request_snapshot", "Received request snapshot");

                        let topic = topic.clone();
                        let snapshot = handle.value().logs.clone();
                        let res = ctx
                            .msg
                            .send(Envelope {
                                addr: msg.addr,
                                topic,
                                body: Message::Snapshot { snapshot },
                                id: uuid7(),
                            })
                            .await;
                        ok_or_continue!("inbound", res)
                    }
                    Message::Snapshot { .. } => {
                        trace!(target: "inbound", message_type = "snapshot", "Received snapshot");

                        // TODO: handle snapshot
                        // let swim = handle.value().swim.clone();
                        // let new_topic = Topic {
                        //     logs: snapshot,
                        //     map: handle.value().map.clone(),
                        //     swim,
                        // };
                        // ctx.topics.insert(topic, new_topic);
                    }
                }
            }
            Some(Err(e)) => {
                warn!("Error while reading inbound stream: {}", e);
            }
            None => {
                // `SelectAll` is empty. Sleep for a while.
                tokio::time::sleep(Duration::from_millis(100)).await;
            }
        }
    }
    Ok(())
}

/// Aggregate all outbound data and dispatch them to corresponding targets.
pub(super) async fn outbound_task(
    msg_recv: AsyncReceiver<Envelope>,
    conn_recv: AsyncReceiver<(SocketAddr, Outbound)>,
    ctx: ContextRef,
) -> Result<()> {
    // TODO: maybe change this to LRU?
    let mut map = HashMap::with_capacity(DEFAULT_CHANNEL_SIZE);

    loop {
        if ctx.cancel_token.is_cancelled() {
            break;
        }

        let msg = select! {
            _ = ctx.cancel_token.cancelled() => break,
            msg = msg_recv.recv() => { msg? },
            conn = conn_recv.recv() => {
                let (addr, conn) = ok_or_continue!("outbound", conn);
                map.insert(addr, conn);
                continue
            }
        };

        let addr = msg.addr;

        trace!(target: "outbound", %addr, ?msg, "Sending message");

        // TODO: better retry
        let mut retry = 3;
        loop {
            if let hash_map::Entry::Vacant(entry) = map.entry(addr) {
                let conn = TcpStream::connect(addr)
                    .await
                    .map(|s| adapt(s.into_split()));
                let (stream, sink) = ok_or_continue!("outbound", conn);
                entry.insert(sink);

                // Continue even if inbound has stopped
                drop(ctx.conn_inbound.send(stream).await);
            };
            let conn = map.get_mut(&addr).unwrap();
            let e = match conn.send(msg.clone()).await {
                Ok(_) => break,
                Err(e) => {
                    debug!(target: "outbound", error = ?e, "Send failed, retry");
                    map.remove(&addr);
                    e
                }
            };
            retry -= 1;
            if retry == 0 {
                warn!(target: "outbound", error = ?e, %addr, "Failed to send message");
                break;
            }
        }
    }
    Ok(())
}

/// Accept income connections and send them to the inbound and outbound task
pub(super) async fn listener_task(listener: TcpListener, ctx: ContextRef) -> Result<()> {
    loop {
        let (stream, src) =
            match select(pin!(ctx.cancel_token.cancelled()), pin!(listener.accept())).await {
                Either::Left(_) => break,
                Either::Right((t, _)) => ok_or_continue!("listener", t),
            };
        tracing::trace!(target: "listener", %src, "New connection");
        let (stream, sink) = adapt(stream.into_split());
        match (
            ctx.conn_inbound.send(stream).await,
            ctx.conn_outbound.send((src, sink)).await,
        ) {
            (Ok(_), Ok(_)) => {}
            (Err(e), _) | (_, Err(e)) => tracing::error!("Error in listener: {}", e),
        }
    }
    Ok(())
}