1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
use lifeline::dyn_bus::DynBus;
use lifeline::{Bus, Lifeline, Receiver, Service, Task};
use microkv::namespace::NamespaceMicroKV;
use postage::broadcast;
use thegraph_liketh::component::TheGraphLikeEthComponent;
use thegraph_liketh::types::LikethChain;

use component_ethereum::web3::Web3Component;
use component_state::state::BridgeState;
use support_common::config::{Config, Names};
use support_lifeline::service::BridgeService;
use support_tracker::Tracker;

use crate::bridge::PangolinRopstenTask;
use crate::bridge::{PangolinRopstenBus, PangolinRopstenConfig};
use crate::bridge::{ToExtrinsicsMessage, ToRelayMessage};
use crate::service::affirm::handler::AffirmHandler;

mod handler;

#[derive(Debug)]
pub struct AffirmService {
    _greet_scan: Lifeline,
    _greet_relay: Lifeline,
    _greet_command: Lifeline,
}

impl BridgeService for AffirmService {}

impl Service for AffirmService {
    type Bus = PangolinRopstenBus;
    type Lifeline = color_eyre::Result<Self>;

    fn spawn(bus: &Self::Bus) -> Self::Lifeline {
        // State
        let state = bus.storage().clone_resource::<BridgeState>()?;

        // affirm scan
        let microkv_scan = state.microkv_with_namespace(PangolinRopstenTask::name());
        let tracker = Tracker::new(microkv_scan, "scan.ropsten.affirm");
        let microkv_scan = state.microkv_with_namespace(PangolinRopstenTask::name());
        let sender_to_extrinsics_scan = bus.tx::<ToExtrinsicsMessage>()?;
        let _greet_scan = Self::try_task(
            &format!("{}-service-affirm-scan", PangolinRopstenTask::name()),
            async move {
                start_scan(
                    tracker.clone(),
                    microkv_scan.clone(),
                    sender_to_extrinsics_scan.clone(),
                )
                .await;
                Ok(())
            },
        );

        // affirm relay service
        let sender_to_extrinsics_relay = bus.tx::<ToExtrinsicsMessage>()?;
        let microkv_relay = state.microkv_with_namespace(PangolinRopstenTask::name());
        let _greet_relay = Self::try_task(
            &format!("{}-service-affirm-relay", PangolinRopstenTask::name()),
            async move {
                if let Err(e) =
                    handle_affirm_relay(microkv_relay.clone(), sender_to_extrinsics_relay.clone())
                        .await
                {
                    tracing::error!(
                        target: "pangolin-ropsten",
                        "[ropsten] [affirm] [handle] Failed to handle affirm relay, err: {:?}",
                        e
                    );
                }
                Ok(())
            },
        );

        // receive affirm command
        let mut rx = bus.rx::<ToRelayMessage>()?;
        let sender_to_extrinsics_command = bus.tx::<ToExtrinsicsMessage>()?;
        let microkv_command = state.microkv_with_namespace(PangolinRopstenTask::name());
        let _greet_command = Self::try_task(
            &format!("{}-service-affirm-command", PangolinRopstenTask::name()),
            async move {
                let handler = AffirmHandler::new(
                    microkv_command.clone(),
                    sender_to_extrinsics_command.clone(),
                )
                .await;

                while let Some(recv) = rx.recv().await {
                    match recv {
                        ToRelayMessage::EthereumBlockNumber(block_number) => {
                            tracing::trace!(
                                target: "pangolin-ropsten",
                                "[ropsten] [affirm] [command] Received new ethereum block number to affirm: {}",
                                block_number
                            );
                            if let Err(e) = handler.update_target(block_number) {
                                tracing::error!(target: "pangolin-ropsten", "{:?}", e);
                            }
                        }
                    }
                }

                Ok(())
            },
        );
        Ok(Self {
            _greet_scan,
            _greet_relay,
            _greet_command,
        })
    }
}

async fn handle_affirm_relay(
    microkv: NamespaceMicroKV,
    sender_to_extrinsics: broadcast::Sender<ToExtrinsicsMessage>,
) -> color_eyre::Result<()> {
    // Config
    let bridge_config: PangolinRopstenConfig = Config::restore(Names::BridgePangolinRopsten)?;
    let task_config = bridge_config.task;

    let mut handler = AffirmHandler::new(microkv.clone(), sender_to_extrinsics.clone()).await;
    loop {
        if let Err(err) = handler.affirm().await {
            tracing::error!(
                target: "pangolin-ropsten",
                "[ropsten] [affirm] [handle] affirm err: {:#?}", err
            );
            // TODO: Consider the errors more carefully
            // Maybe a websocket err, so wait 10 secs to reconnect.
            tokio::time::sleep(std::time::Duration::from_secs(10)).await;
            handler = AffirmHandler::new(microkv.clone(), sender_to_extrinsics.clone()).await;
            continue;
        }

        tokio::time::sleep(std::time::Duration::from_secs(task_config.interval_relay)).await;
    }
}

async fn start_scan(
    tracker: Tracker,
    microkv: NamespaceMicroKV,
    sender_to_extrinsics: broadcast::Sender<ToExtrinsicsMessage>,
) {
    while let Err(err) = run_scan(&tracker, microkv.clone(), sender_to_extrinsics.clone()).await {
        tracing::error!(
            target: "pangolin-ropsten",
            "[ropsten] [affirm] [scan] Failed to run scan ropsten transaction. err: {:?}",
            err
        );
    }
}

async fn run_scan(
    tracker: &Tracker,
    microkv: NamespaceMicroKV,
    sender_to_extrinsics: broadcast::Sender<ToExtrinsicsMessage>,
) -> color_eyre::Result<()> {
    let bridge_config: PangolinRopstenConfig = Config::restore(Names::BridgePangolinRopsten)?;
    let task_config = bridge_config.task;

    // the graph
    let thegraph_liketh =
        TheGraphLikeEthComponent::component(bridge_config.thegraph, LikethChain::Ropsten)?;
    let web3 = Web3Component::component(bridge_config.web3)?;

    let handler = AffirmHandler::new(microkv, sender_to_extrinsics).await;

    loop {
        let from = tracker.current().await?;
        let limit = 30usize;

        tracing::trace!(
            target: "pangolin-ropsten",
            "[ropsten] [affirm] [scan] Track affirm block: {} and limit: {}",
            from,
            limit
        );
        let tx = thegraph_liketh.last_transaction().await?;
        if tx.is_none() {
            tracing::info!(
                target: "pangolin-ropsten",
                "[ropsten] [affirm] [scan] Not found any transactions to affirm"
            );
            tokio::time::sleep(std::time::Duration::from_secs(
                task_config.interval_ethereum,
            ))
            .await;
            continue;
        }

        let last_eth_block_number = web3.eth().block_number().await?.as_u64();

        let latest = tx.unwrap();
        let next_block_number = latest.block_number + 1;

        // Waiting for some blocks, to offset the reorg risk
        if last_eth_block_number < next_block_number
            || last_eth_block_number - next_block_number < 20
        {
            tracing::info!(
                target: "pangolin-ropsten",
                "[ropsten] [affirm] [scan] Waiting for some blocks, to offset the reorg risk",
            );
            tokio::time::sleep(std::time::Duration::from_secs(
                task_config.interval_ethereum,
            ))
            .await;
            continue;
        }

        handler.update_target(next_block_number)?;
        tracker.finish(latest.block_number as usize)?;
        tokio::time::sleep(std::time::Duration::from_secs(
            task_config.interval_ethereum,
        ))
        .await;
    }
}