1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
use lifeline::dyn_bus::DynBus;
use lifeline::{Bus, Lifeline, Service, Task};
use postage::broadcast;
use thegraph_liketh::component::TheGraphLikeEthComponent;
use thegraph_liketh::types::LikethChain;

use component_state::state::BridgeState;
use support_common::config::{Config, Names};
use support_lifeline::service::BridgeService;
use support_tracker::Tracker;

use crate::bridge::PangolinRopstenBus;
use crate::bridge::PangolinRopstenTask;
use crate::bridge::TaskConfig;
use crate::bridge::{PangolinRopstenConfig, ToExtrinsicsMessage};
use crate::service::redeem::handler::RedeemHandler;

mod handler;

#[derive(Debug)]
pub struct RedeemService {
    _greet_scan: Lifeline,
}

impl BridgeService for RedeemService {}

impl Service for RedeemService {
    type Bus = PangolinRopstenBus;
    type Lifeline = color_eyre::Result<Self>;

    #[allow(irrefutable_let_patterns)]
    fn spawn(bus: &Self::Bus) -> Self::Lifeline {
        // Datastore
        let state = bus.storage().clone_resource::<BridgeState>()?;
        let microkv = state.microkv_with_namespace(PangolinRopstenTask::name());
        let tracker = Tracker::new(microkv, "scan.ropsten.redeem");

        // Receiver & Sender
        let sender_to_extrinsics_scan = bus.tx::<ToExtrinsicsMessage>()?;

        let _greet_scan = Self::try_task(
            &format!("{}-service-redeem-scan", PangolinRopstenTask::name()),
            async move {
                start_scan(tracker.clone(), sender_to_extrinsics_scan.clone()).await;
                Ok(())
            },
        );

        Ok(Self { _greet_scan })
    }
}

async fn start_scan(
    tracker: Tracker,
    sender_to_extrinsics: broadcast::Sender<ToExtrinsicsMessage>,
) {
    while let Err(err) = run_scan(&tracker, sender_to_extrinsics.clone()).await {
        tracing::error!(
            target: "pangolin-ropsten",
            "[ropsten] [redeem] redeem err {:?}",
            err
        );
        tokio::time::sleep(std::time::Duration::from_secs(10)).await;
    }
}

async fn run_scan(
    tracker: &Tracker,
    sender_to_extrinsics: broadcast::Sender<ToExtrinsicsMessage>,
) -> color_eyre::Result<()> {
    let bridge_config: PangolinRopstenConfig = Config::restore(Names::BridgePangolinRopsten)?;

    // task config
    let task_config: TaskConfig = bridge_config.task;

    // the graph
    let thegraph_liketh =
        TheGraphLikeEthComponent::component(bridge_config.thegraph, LikethChain::Ropsten)?;

    let mut handler = RedeemHandler::new(sender_to_extrinsics.clone()).await;
    loop {
        let from = tracker.current().await?;
        let limit = 10usize;

        tracing::trace!(
            target: "pangolin-ropsten",
            "[ropsten] [redeem] Track redeem block: {} and limit: {}",
            from,
            limit
        );
        let txs = thegraph_liketh
            .query_transactions(from as u64, limit as u32, true)
            .await?;
        if txs.is_empty() {
            tracing::info!(
                target: "pangolin-ropsten",
                "[ropsten] [redeem] Not found any transactions to redeem"
            );
            tokio::time::sleep(std::time::Duration::from_secs(
                task_config.interval_ethereum,
            ))
            .await;
            continue;
        }
        tracing::debug!(
            target: "pangolin-ropsten",
            "[ropsten] [redeem] Found {} transactions wait to redeem",
            txs.len()
        );

        let mut latest_redeem_block_number = None;
        // send transactions to redeem
        'for_tx: for tx in &txs {
            let mut times = 0;
            'loop_redeem: loop {
                match handler.redeem(tx.clone()).await {
                    Ok(Some(latest)) => {
                        tracing::trace!(
                            target: "pangolin-ropsten",
                            "[ropsten] [redeem] [{}] Change latest redeemed block number to: {}",
                            times,
                            latest,
                        );
                        latest_redeem_block_number = Some(latest);
                        break 'loop_redeem;
                    }
                    Ok(None) => {
                        tracing::trace!(
                            target: "pangolin-ropsten",
                            "[ropsten] [redeem] [{}] Latest redeemed block number is: {:?}",
                            times,
                            latest_redeem_block_number,
                        );
                        break 'for_tx;
                    }
                    Err(e) => {
                        tokio::time::sleep(std::time::Duration::from_secs(2)).await;
                        tracing::warn!(
                            target: "pangolin-ropsten",
                            "[ropsten] [redeem] [{}] Redeem failed will be try again. tx: {:?}, err: {:?} ",
                            times,
                            tx,
                            e,
                        );
                        times += 1;
                        if times > 10 {
                            tracing::error!(
                                target: "pangolin-ropsten",
                                "[ropsten] [redeem] [{}] Failed to send redeem message. tx: {:?}, err: {:?}",
                                times,
                                tx,
                                e,
                            );
                            break 'for_tx;
                        }
                        handler = RedeemHandler::new(sender_to_extrinsics.clone()).await;
                    }
                }
            }
        }

        if latest_redeem_block_number.is_none() {
            tracing::warn!(
                target: "pangolin-ropsten",
                "[ropsten] [redeem] Not have any block redeemed. please wait affirm"
            );
            tokio::time::sleep(std::time::Duration::from_secs(
                task_config.interval_ethereum,
            ))
            .await;
            continue;
        }

        let latest = latest_redeem_block_number.unwrap();
        tracing::info!(
            target: "pangolin-ropsten",
            "[ropsten] [redeem] Set scan redeem block number to: {}",
            latest
        );
        tracker.finish(latest as usize)?;
        tokio::time::sleep(std::time::Duration::from_secs(
            task_config.interval_ethereum,
        ))
        .await;
    }
}