1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
//! Models include data wrapper and type aliases used in the project.

use crdts::SList;

use crate::tasks::SwimJobHandle;

mod_use::mod_use![config, message, msg_impl, event,];

use std::{
    fmt::Display,
    net::SocketAddr,
    time::{SystemTime, UNIX_EPOCH},
};

use crdts::list::Op;
use serde::{Deserialize, Serialize};

/// Represent operations on a topic.
pub type LogOp = Op<Log, Actor>;

/// Type for CRDT to identify actors, differentiate between them, and causality.
/// This is used everytime on updating the logs list.
#[repr(transparent)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct Actor(u64);

impl Actor {
    pub fn new(id: u64) -> Self {
        Self(id)
    }

    pub fn into_inner(self) -> u64 {
        self.0
    }

    pub fn random() -> Self {
        Self(rand::random())
    }
}

impl<T: Into<u64>> From<T> for Actor {
    fn from(val: T) -> Self {
        Self(val.into())
    }
}

/// UNIX Timestamp. Uses 64 bit unsigned internally.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct Timestamp(u64);

impl Timestamp {
    pub fn now() -> Self {
        Self(
            SystemTime::now()
                .duration_since(UNIX_EPOCH)
                .unwrap()
                .as_millis() as u64,
        )
    }

    pub fn into_inner(self) -> u64 {
        self.0
    }
}

impl From<u64> for Timestamp {
    fn from(val: u64) -> Self {
        Self(val)
    }
}

impl From<Timestamp> for u64 {
    fn from(val: Timestamp) -> Self {
        val.0
    }
}

#[test]
fn test_ts_serde_overhead() {
    use bincode::Options;

    use crate::codec::bincode_option;

    assert_eq!(
        bincode_option().serialize(&Timestamp(0)).unwrap(),
        bincode_option().serialize(&0u64).unwrap()
    );
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Id {
    addr: SocketAddr,
    rev: u64, // TODO: use other id, like uuid
}

impl Id {
    pub fn addr(&self) -> SocketAddr {
        self.addr
    }
}

impl Display for Id {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{}@{}", self.addr, self.rev)
    }
}

impl foca::Identity for Id {
    fn renew(&self) -> Option<Self> {
        let Self { addr, rev } = self;

        Some(Self {
            addr: *addr,
            rev: rev.wrapping_add(1),
        })
    }

    fn has_same_prefix(&self, other: &Self) -> bool {
        self.addr == other.addr
    }
}

impl From<SocketAddr> for Id {
    fn from(addr: SocketAddr) -> Self {
        Self { addr, rev: 0 }
    }
}

// TODO: placeholder for LimitLog
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct Log {
    message: String,
    ts: Timestamp,
}

impl Log {
    pub fn new(message: impl Into<String>) -> Self {
        Self {
            message: message.into(),
            ts: Timestamp::now(),
        }
    }

    pub fn message(&self) -> &str {
        &self.message
    }

    pub fn ts(&self) -> Timestamp {
        self.ts
    }

    #[cfg(test)]
    pub fn random() -> Self {
        use rand::{distributions::Alphanumeric, thread_rng, Rng};

        Self {
            message: thread_rng()
                .sample_iter(&Alphanumeric)
                .take(thread_rng().gen_range(5..15))
                .map(char::from)
                .collect(),
            ts: Timestamp::now(),
        }
    }
}

/// States of the Orkas node
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct State {
    pub cancelled: bool,
    pub msg: ChannelState,
    pub crdt: ChannelState,
    pub swim: ChannelState,
    pub inbound: ChannelState,
    pub outbound: ChannelState,
    pub crdt_topics: ChannelState,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct ChannelState {}

/// Represent a single topic which is then being stored in the global topics
/// map, and can be updated separately and synced throughout a single topic
/// cluster.
#[derive(Debug)]
pub(crate) struct Topic {
    pub logs: LogList,
    pub swim: SwimJobHandle,
    pub map: limlog::Topic,
}

impl Topic {
    pub fn stop(&self) {
        self.swim.stop();
        self.map.stop()
    }
}

pub type LogList = SList<Log, Actor>;