casper_rust_wasm_sdk/sdk/watcher/
watcher.rs

1use crate::SDK;
2use chrono::{Duration, Utc};
3use futures_util::StreamExt;
4#[cfg(target_arch = "wasm32")]
5use gloo_utils::format::JsValueSerdeExt;
6#[cfg(target_arch = "wasm32")]
7use js_sys::Promise;
8use serde::{Deserialize, Deserializer, Serialize};
9use serde_json::Value;
10use std::{
11    fmt,
12    sync::{Arc, Mutex},
13};
14use wasm_bindgen::prelude::*;
15#[cfg(target_arch = "wasm32")]
16use wasm_bindgen_futures::future_to_promise;
17
18const DEFAULT_TIMEOUT_MS: u64 = 60000;
19
20impl SDK {
21    /// Creates a new Watcher instance to watch deploys.
22    /// Legacy alias
23    ///
24    /// # Arguments
25    ///
26    /// * `events_url` - The URL to monitor for transaction events.
27    /// * `timeout_duration` - An optional timeout duration in seconds.
28    ///
29    /// # Returns
30    ///
31    /// A `Watcher` instance.
32    #[deprecated(note = "prefer 'watch_transaction'")]
33    pub fn watch_deploy(&self, events_url: &str, timeout_duration: Option<u64>) -> Watcher {
34        Watcher::new(events_url.to_string(), timeout_duration)
35    }
36
37    /// Creates a new Watcher instance to watch deploys.
38    ///
39    /// # Arguments
40    ///
41    /// * `events_url` - The URL to monitor for transaction events.
42    /// * `timeout_duration` - An optional timeout duration in seconds.
43    ///
44    /// # Returns
45    ///
46    /// A `Watcher` instance.
47    pub fn watch_transaction(&self, events_url: &str, timeout_duration: Option<u64>) -> Watcher {
48        Watcher::new(events_url.to_string(), timeout_duration)
49    }
50
51    /// Waits for a deploy event to be processed asynchronously.
52    /// Legacy alias
53    ///
54    /// # Arguments
55    ///
56    /// * `events_url` - The URL to monitor for transaction events.
57    /// * `deploy_hash` - The deploy hash to wait for.
58    /// * `timeout_duration` - An optional timeout duration in milliseconds.
59    ///
60    /// # Returns
61    ///
62    /// A `Result` containing either the processed `EventParseResult` or an error message.
63    #[deprecated(note = "prefer 'wait_transaction' with transaction")]
64    pub async fn wait_deploy(
65        &self,
66        events_url: &str,
67        deploy_hash: &str,
68        timeout_duration: Option<u64>,
69    ) -> Result<EventParseResult, String> {
70        Self::wait_transaction_internal(
71            events_url.to_string(),
72            deploy_hash.to_string(),
73            timeout_duration,
74        )
75        .await
76    }
77
78    /// Alias for wait_deploy Waits for a deploy event to be processed asynchronously.
79    ///
80    /// # Arguments
81    ///
82    /// * `events_url` - The URL to monitor for transaction events.
83    /// * `target_hash` - The transaction hash to wait for.
84    /// * `timeout_duration` - An optional timeout duration in milliseconds.
85    ///
86    /// # Returns
87    ///
88    /// A `Result` containing either the processed `EventParseResult` or an error message
89    pub async fn wait_transaction(
90        &self,
91        events_url: &str,
92        target_hash: &str,
93        timeout_duration: Option<u64>,
94    ) -> Result<EventParseResult, String> {
95        Self::wait_transaction_internal(
96            events_url.to_string(),
97            target_hash.to_string(),
98            timeout_duration,
99        )
100        .await
101    }
102
103    /// Internal function to wait for a deploy event.
104    ///
105    /// # Arguments
106    ///
107    /// * `events_url` - The URL to monitor for transaction events.
108    /// * `target_hash` - The transaction hash to wait for.
109    /// * `timeout_duration` - An optional timeout duration in milliseconds.
110    ///
111    /// # Returns
112    ///
113    /// A `Result` containing either the processed `EventParseResult` or an error message.
114    async fn wait_transaction_internal(
115        events_url: String,
116        target_hash: String,
117        timeout_duration: Option<u64>,
118    ) -> Result<EventParseResult, String> {
119        let watcher = Watcher::new(events_url, timeout_duration);
120        let result = watcher.start_internal(Some(target_hash)).await;
121        match result {
122            Some(event_parse_results) => {
123                if let Some(event_parse_result) = event_parse_results.first() {
124                    return Ok(event_parse_result.clone());
125                }
126                Err("No first event result".to_string())
127            }
128            None => Err("No event result found".to_string()),
129        }
130    }
131}
132
133#[wasm_bindgen]
134impl SDK {
135    /// Creates a new Watcher instance to watch deploys (JavaScript-friendly).
136    /// Legacy alias
137    ///
138    /// # Arguments
139    ///
140    /// * `events_url` - The URL to monitor for transaction events.
141    /// * `timeout_duration` - An optional timeout duration in seconds.
142    ///
143    /// # Returns
144    ///
145    /// A `Watcher` instance.
146    #[cfg(target_arch = "wasm32")]
147    #[wasm_bindgen(js_name = "watchDeploy")]
148    #[deprecated(note = "prefer 'watchTransaction'")]
149    #[allow(deprecated)]
150    pub fn watch_deploy_js_alias(
151        &self,
152        events_url: &str,
153        timeout_duration: Option<u32>,
154    ) -> Watcher {
155        self.watch_deploy(events_url, timeout_duration.map(Into::into))
156    }
157
158    /// Creates a new Watcher instance to watch deploys (JavaScript-friendly).
159    ///
160    /// # Arguments
161    ///
162    /// * `events_url` - The URL to monitor for transaction events.
163    /// * `timeout_duration` - An optional timeout duration in seconds.
164    ///
165    /// # Returns
166    ///
167    /// A `Watcher` instance.
168    #[cfg(target_arch = "wasm32")]
169    #[wasm_bindgen(js_name = "watchTransaction")]
170    pub fn watch_transaction_js_alias(
171        &self,
172        events_url: &str,
173        timeout_duration: Option<u32>,
174    ) -> Watcher {
175        self.watch_transaction(events_url, timeout_duration.map(Into::into))
176    }
177
178    /// Waits for a deploy event to be processed asynchronously (JavaScript-friendly).
179    /// Legacy alias
180    ///
181    /// # Arguments
182    ///
183    /// * `events_url` - The URL to monitor for transaction events.
184    /// * `deploy_hash` - The deploy hash to wait for.
185    /// * `timeout_duration` - An optional timeout duration in seconds.
186    ///
187    /// # Returns
188    ///
189    /// A JavaScript `Promise` resolving to either the processed `EventParseResult` or an error message.
190    #[cfg(target_arch = "wasm32")]
191    #[wasm_bindgen(js_name = "waitDeploy")]
192    #[deprecated(note = "prefer 'waitTransaction' with transaction")]
193    #[allow(deprecated)]
194    pub async fn wait_deploy_js_alias(
195        &self,
196        events_url: &str,
197        deploy_hash: &str,
198        timeout_duration: Option<u32>,
199    ) -> Promise {
200        self.wait_transaction_js_alias(events_url, deploy_hash, timeout_duration)
201            .await
202    }
203
204    /// Waits for a deploy event to be processed asynchronously (JavaScript-friendly).
205    ///
206    /// # Arguments
207    ///
208    /// * `events_url` - The URL to monitor for transaction events.
209    /// * `target_hash` - The transaction hash to wait for.
210    /// * `timeout_duration` - An optional timeout duration in seconds.
211    ///
212    /// # Returns
213    ///
214    /// A JavaScript `Promise` resolving to either the processed `EventParseResult` or an error message.
215    #[cfg(target_arch = "wasm32")]
216    #[wasm_bindgen(js_name = "waitTransaction")]
217    pub async fn wait_transaction_js_alias(
218        &self,
219        events_url: &str,
220        target_hash: &str,
221        timeout_duration: Option<u32>,
222    ) -> Promise {
223        let events_url = events_url.to_string();
224        let target_hash = target_hash.to_string();
225        let future = async move {
226            let result = Self::wait_transaction_internal(
227                events_url,
228                target_hash,
229                timeout_duration.map(Into::into),
230            )
231            .await;
232            match result {
233                Ok(event_parse_result) => JsValue::from_serde(&event_parse_result)
234                    .map_err(|err| JsValue::from_str(&format!("{err}"))),
235                Err(err) => Err(JsValue::from_str(&err)),
236            }
237        };
238
239        future_to_promise(future)
240    }
241}
242
243/// Represents a deploy watcher responsible for monitoring transaction events.
244///
245/// This struct allows clients to subscribe to transaction events, start watching for events,
246/// or wait for an event and handle the received deploy event data.
247///
248/// # Fields
249///
250/// * `events_url` - The URL for transaction events.
251/// * `subscriptions` - Vector containing deploy subscriptions.
252/// * `active` - Reference-counted cell indicating whether the deploy watcher is active.
253/// * `timeout_duration` - Duration representing the optional timeout for watching events.
254#[derive(Clone)]
255#[wasm_bindgen]
256pub struct Watcher {
257    events_url: String,
258    subscriptions: Vec<Subscription>,
259    active: Arc<Mutex<bool>>,
260    timeout_duration: Duration,
261}
262
263#[wasm_bindgen]
264impl Watcher {
265    /// Creates a new `Watcher` instance.
266    ///
267    /// # Arguments
268    ///
269    /// * `events_url` - The URL for transaction events.
270    /// * `timeout_duration` - Optional duration in milliseconds for watching events. If not provided,
271    ///   a default timeout of 60,000 milliseconds (1 minute) is used.
272    ///
273    /// # Returns
274    ///
275    /// A new `Watcher` instance.
276    #[wasm_bindgen(constructor)]
277    pub fn new(events_url: String, timeout_duration: Option<u64>) -> Self {
278        let timeout_duration = Duration::try_milliseconds(
279            timeout_duration
280                .unwrap_or(DEFAULT_TIMEOUT_MS)
281                .try_into()
282                .unwrap(),
283        )
284        .unwrap_or_default();
285
286        Watcher {
287            events_url,
288            subscriptions: Vec::new(),
289            active: Arc::new(Mutex::new(true)),
290            timeout_duration,
291        }
292    }
293
294    /// Subscribes to transaction events.
295    ///
296    /// # Arguments
297    ///
298    /// * `subscriptions` - Vector of deploy subscriptions to be added.
299    ///
300    /// # Returns
301    ///
302    /// Result indicating success or an error message.
303    #[cfg(target_arch = "wasm32")]
304    #[wasm_bindgen(js_name = "subscribe")]
305    pub fn subscribe_js_alias(&mut self, subscriptions: Vec<Subscription>) -> Result<(), String> {
306        self.subscribe(subscriptions)
307    }
308
309    /// Unsubscribes from transaction events based on the provided transaction hash.
310    ///
311    /// # Arguments
312    ///
313    /// * `transaction_hash` - The transaction hash to unsubscribe.
314    ///
315    /// This method removes the deploy subscription associated with the provided transaction hash.
316    #[wasm_bindgen]
317    pub fn unsubscribe(&mut self, target_hash: String) {
318        self.subscriptions.retain(|s| s.target_hash != target_hash);
319    }
320
321    /// Starts watching for transaction events (JavaScript-friendly).
322    ///
323    /// # Returns
324    ///
325    /// Result containing the serialized transaction events data or an error message.
326    #[cfg(target_arch = "wasm32")]
327    #[wasm_bindgen(js_name = "start")]
328    pub async fn start_js_alias(&self) -> Result<JsValue, JsError> {
329        let result = match self.start_internal(None).await {
330            Some(res) => res,
331            None => return Ok(JsValue::NULL),
332        };
333
334        let serialized = JsValue::from_serde(&result)
335            .map_err(|err| JsError::new(&format!("Error serializing events: {err:?}")))?;
336
337        Ok(serialized)
338    }
339
340    /// Stops watching for transaction events.
341    ///
342    /// This method sets the deploy watcher as inactive and stops the event listener if it exists.
343    #[wasm_bindgen]
344    pub fn stop(&self) {
345        {
346            let mut active = self.active.lock().unwrap();
347            *active = false;
348        }
349    }
350}
351
352impl Watcher {
353    /// Asynchronously starts watching for transaction events and execute callback handler functions from deploy subscriptions
354    ///
355    /// # Returns
356    ///
357    /// An `Option` containing the serialized deploy event data or `None` if no events are received.
358    pub async fn start(&self) -> Option<Vec<EventParseResult>> {
359        self.start_internal(None).await
360    }
361
362    /// Asynchronously starts watching for transaction events
363    ///
364    /// # Arguments
365    ///
366    /// * `transaction_hash` - Optional transaction hash to directly return processed event. If provided, it directly returns matched events without executing callback handler functions from deploy subscriptions. If `None`, it executes callback handler functions from deploy subscriptions.
367    ///
368    /// # Returns
369    ///
370    /// An `Option` containing the serialized deploy event data or `None` if no events are received.
371    async fn start_internal(&self, target_hash: Option<String>) -> Option<Vec<EventParseResult>> {
372        {
373            let mut active = self.active.lock().unwrap();
374            *active = true;
375        }
376
377        let client = reqwest::Client::new();
378        let url = self.events_url.clone();
379
380        // TODO fix this warning
381        // https://github.com/rust-lang/rust-clippy/issues/11034
382        #[allow(clippy::arc_with_non_send_sync)]
383        let watcher = Arc::new(Mutex::new(self.clone()));
384
385        let start_time = Utc::now();
386        let timeout_duration = self.timeout_duration;
387
388        let response = match client.get(&url).send().await {
389            Ok(res) => res,
390            Err(err) => {
391                let err = err.to_string();
392                let event_parse_result = EventParseResult {
393                    err: Some(err.to_string()),
394                    body: None,
395                };
396                return Some([event_parse_result].to_vec());
397            }
398        };
399
400        if response.status().is_success() {
401            let buffer_size = 1;
402            let mut buffer = Vec::with_capacity(buffer_size);
403
404            let mut bytes_stream = response.bytes_stream();
405            while let Some(chunk) = bytes_stream.next().await {
406                match chunk {
407                    Ok(bytes) => {
408                        let this_clone = Arc::clone(&watcher);
409                        if !*this_clone.lock().unwrap().active.lock().unwrap() {
410                            return None;
411                        }
412
413                        if Utc::now() - start_time >= timeout_duration {
414                            let event_parse_result = EventParseResult {
415                                err: Some("Timeout expired".to_string()),
416                                body: None,
417                            };
418                            return Some([event_parse_result].to_vec());
419                        }
420
421                        buffer.extend_from_slice(&bytes);
422
423                        while let Some(index) = buffer.iter().position(|&b| b == b'\n') {
424                            let message = buffer.drain(..=index).collect::<Vec<_>>();
425
426                            if let Ok(message) = std::str::from_utf8(&message) {
427                                let watcher_guard = this_clone.lock().unwrap();
428                                let result = watcher_guard
429                                    .clone()
430                                    .process_events(message, target_hash.as_deref());
431                                match result {
432                                    Some(event_parse_result) => return Some(event_parse_result),
433                                    None => {
434                                        continue;
435                                    }
436                                };
437                            } else {
438                                let event_parse_result = EventParseResult {
439                                    err: Some("Error decoding UTF-8 data".to_string()),
440                                    body: None,
441                                };
442                                return Some([event_parse_result].to_vec());
443                            }
444                        }
445                    }
446                    Err(err) => {
447                        let event_parse_result = EventParseResult {
448                            err: Some(format!("Error reading chunk: {err}")),
449                            body: None,
450                        };
451                        return Some([event_parse_result].to_vec());
452                    }
453                }
454            }
455        } else {
456            let event_parse_result = EventParseResult {
457                err: Some("Failed to fetch stream".to_string()),
458                body: None,
459            };
460            return Some([event_parse_result].to_vec());
461        }
462        None
463    }
464
465    /// Subscribes to transaction events.
466    ///
467    /// # Arguments
468    ///
469    /// * `subscriptions` - Vector of subscriptions to be added.
470    ///
471    /// # Returns
472    ///
473    /// Result indicating success or an error message.
474    pub fn subscribe(&mut self, subscriptions: Vec<Subscription>) -> Result<(), String> {
475        for new_subscription in &subscriptions {
476            if self
477                .subscriptions
478                .iter()
479                .any(|s| s.target_hash == new_subscription.target_hash)
480            {
481                return Err(String::from("Already subscribed to this event"));
482            }
483        }
484        self.subscriptions.extend(subscriptions);
485        Ok(())
486    }
487
488    /// Processes events received from the stream and notifies subscribers.
489    ///
490    /// # Arguments
491    ///
492    /// * `message` - The raw message received from the event stream.
493    /// * `target_transaction_hash` - Optional transaction hash to directly return. If provided, it directly returns matched events without executing callback handler functions from subscriptions. If `None`, it executes callback handler functions from subscriptions.
494    ///
495    /// # Returns
496    ///
497    /// An `Option` containing the serialized transaction/deploy event data or `None` if an error occurs.
498    fn process_events(
499        mut self,
500        message: &str,
501        target_hash: Option<&str>,
502    ) -> Option<Vec<EventParseResult>> {
503        let data_stream = Self::extract_data_stream(message);
504
505        for data_item in data_stream {
506            let trimmed_item = data_item.trim();
507            let transaction_processed_str = EventName::TransactionProcessed.to_string();
508
509            if !trimmed_item.contains(&transaction_processed_str) {
510                continue;
511            }
512
513            if let Ok(parsed_json) = serde_json::from_str::<Value>(trimmed_item) {
514                let transaction = parsed_json.get(transaction_processed_str);
515                if let Some(transaction_processed) =
516                    transaction.and_then(|transaction| transaction.as_object())
517                {
518                    if let Some(transaction_hash_processed) = transaction_processed
519                        .get("transaction_hash")
520                        .and_then(|transaction_hash| {
521                            transaction_hash
522                                .get("Version1")
523                                .or_else(|| transaction_hash.get("Deploy"))
524                                .and_then(|transaction_hash| transaction_hash.as_str())
525                        })
526                    {
527                        let mut transaction_hash_found =
528                            target_hash == Some(transaction_hash_processed);
529
530                        let transaction_processed: Option<TransactionProcessed> =
531                            serde_json::from_value(transaction.unwrap().clone()).ok();
532
533                        let body = Some(Body {
534                            transaction_processed,
535                        });
536
537                        let event_parse_result = EventParseResult { err: None, body };
538
539                        if transaction_hash_found {
540                            self.unsubscribe(target_hash.unwrap().to_string());
541                            self.stop();
542                            return Some([event_parse_result].to_vec());
543                        }
544
545                        let mut results: Vec<EventParseResult> = [].to_vec();
546                        for subscription in self.subscriptions.clone().iter() {
547                            if transaction_hash_processed == subscription.target_hash {
548                                let event_handler = &subscription.event_handler_fn;
549
550                                #[cfg(not(target_arch = "wasm32"))]
551                                {
552                                    event_handler.call(event_parse_result.clone());
553                                }
554                                #[cfg(target_arch = "wasm32")]
555                                {
556                                    let this = JsValue::null();
557                                    let args = js_sys::Array::new();
558                                    args.push(
559                                        &JsValue::from_serde(&event_parse_result.clone()).unwrap(),
560                                    );
561                                    event_handler.apply(&this, &args).unwrap();
562                                }
563
564                                self.unsubscribe(transaction_hash_processed.to_string());
565                                transaction_hash_found = true;
566                                results.push(event_parse_result.clone())
567                            }
568                        }
569
570                        if transaction_hash_found && self.subscriptions.is_empty() {
571                            self.stop();
572                            return Some(results);
573                        }
574                    }
575                }
576            } else {
577                let event_parse_result = EventParseResult {
578                    err: Some("Failed to parse JSON data.".to_string()),
579                    body: None,
580                };
581                return Some([event_parse_result].to_vec());
582            }
583        }
584        None
585    }
586
587    /// Extracts the data stream from the raw JSON data.
588    ///
589    /// # Arguments
590    ///
591    /// * `json_data` - The raw JSON data containing the data stream.
592    ///
593    /// # Returns
594    ///
595    /// A vector of data items within the data stream.
596    fn extract_data_stream(json_data: &str) -> Vec<&str> {
597        let data_stream: Vec<&str> = json_data
598            .split("data:")
599            .filter(|s| !s.is_empty())
600            .map(|s| s.split("id:").next().unwrap_or(""))
601            .collect();
602        data_stream
603    }
604}
605
606/// A wrapper for an event handler function, providing synchronization and cloning capabilities.
607pub struct EventHandlerFn(Arc<Mutex<dyn Fn(EventParseResult) + Send + Sync>>);
608
609#[allow(dead_code)]
610impl EventHandlerFn {
611    /// Creates a new `EventHandlerFn` with the specified event handling function.
612    ///
613    /// # Arguments
614    ///
615    /// * `func` - A function that takes an `EventParseResult` as an argument.
616    ///
617    /// # Returns
618    ///
619    /// A new `EventHandlerFn` instance.
620    pub fn new<F>(func: F) -> Self
621    where
622        F: Fn(EventParseResult) + Send + Sync + 'static,
623    {
624        EventHandlerFn(Arc::new(Mutex::new(func)))
625    }
626
627    /// Calls the stored event handling function with the provided `EventParseResult`.
628    ///
629    /// # Arguments
630    ///
631    /// * `event_result` - The result of an event to be passed to the stored event handling function.
632    pub fn call(&self, event_result: EventParseResult) {
633        let func = self.0.lock().unwrap();
634        (*func)(event_result); // Call the stored function with arguments
635    }
636}
637
638impl fmt::Debug for EventHandlerFn {
639    /// Implements the `Debug` trait for better debugging support.
640    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
641        write!(f, "EventHandlerFn")
642    }
643}
644
645impl Clone for EventHandlerFn {
646    /// Implements the `Clone` trait for creating a cloned instance with shared underlying data.
647    fn clone(&self) -> Self {
648        EventHandlerFn(self.0.clone())
649    }
650}
651
652impl Default for EventHandlerFn {
653    /// Implements the `Default` trait, creating a default instance with a no-op event handling function.
654    fn default() -> Self {
655        EventHandlerFn(Arc::new(Mutex::new(|_event_result| {})))
656    }
657}
658
659// Define Subscription struct with different configurations based on the target architecture.
660#[cfg(not(target_arch = "wasm32"))]
661/// Represents a subscription to transaction events for non-wasm32 target architecture.
662#[derive(Debug, Clone, Default)]
663pub struct Subscription {
664    /// Transaction target hash to identify the subscription.
665    pub target_hash: String,
666    /// Handler function for transaction events.
667    pub event_handler_fn: EventHandlerFn,
668}
669
670#[cfg(target_arch = "wasm32")]
671/// Represents a subscription to transaction events for wasm32 target architecture.
672#[derive(Debug, Clone, Default)]
673#[wasm_bindgen(getter_with_clone)]
674pub struct Subscription {
675    /// Transaction target hash to identify the subscription.
676    #[wasm_bindgen(js_name = "targetHash")]
677    pub target_hash: String,
678    /// Handler function for transaction events.
679    #[wasm_bindgen(js_name = "eventHandlerFn")]
680    pub event_handler_fn: js_sys::Function,
681}
682
683impl Subscription {
684    /// Constructor for Subscription for non-wasm32 target architecture.
685    ///
686    /// # Arguments
687    ///
688    /// * `target_hash` - Transaction target hash to identify the subscription.
689    /// * `event_handler_fn` - Handler function for transaction events.
690    #[cfg(not(target_arch = "wasm32"))]
691    pub fn new(target_hash: String, event_handler_fn: EventHandlerFn) -> Self {
692        Self {
693            target_hash,
694            event_handler_fn,
695        }
696    }
697}
698
699#[wasm_bindgen]
700impl Subscription {
701    /// Constructor for Subscription for wasm32 target architecture.
702    ///
703    /// # Arguments
704    ///
705    /// * `transaction_hash` - Transaction hash to identify the subscription.
706    /// * `event_handler_fn` - Handler function for transaction events.
707    #[cfg(target_arch = "wasm32")]
708    #[wasm_bindgen(constructor)]
709    pub fn new(target_hash: String, event_handler_fn: js_sys::Function) -> Self {
710        Self {
711            target_hash,
712            event_handler_fn,
713        }
714    }
715}
716
717/// Represents a failure response containing an error message.
718#[derive(Debug, Deserialize, Clone, Default, Serialize)]
719#[wasm_bindgen(getter_with_clone)]
720pub struct Failure {
721    pub cost: String,
722    pub error_message: String,
723}
724
725/// Represents a success response containing a cost value.
726#[derive(Debug, Deserialize, Clone, Default, Serialize)]
727#[wasm_bindgen(getter_with_clone)]
728pub struct Version2 {
729    pub initiator: PublicKeyString,
730    pub error_message: Option<String>,
731    pub limit: String,
732    pub consumed: String,
733    pub cost: String,
734    // pub payment: Vec<Payment>,
735}
736
737#[derive(Debug, Deserialize, Clone, Default, Serialize)]
738#[wasm_bindgen(getter_with_clone)]
739pub struct Payment {
740    pub source: String,
741}
742
743/// Represents the result of an execution, either Success or Failure.
744#[derive(Debug, Deserialize, Clone, Default, Serialize)]
745#[wasm_bindgen(getter_with_clone)]
746pub struct ExecutionResult {
747    /// Optional Success information.
748    #[serde(rename = "Version2")]
749    #[wasm_bindgen(js_name = "Success")]
750    pub success: Option<Version2>,
751    /// Optional Failure information.
752    #[serde(rename = "Failure")]
753    #[wasm_bindgen(js_name = "Failure")]
754    pub failure: Option<Failure>,
755}
756
757#[derive(Debug, Clone, Serialize, Default)]
758#[wasm_bindgen(getter_with_clone)]
759pub struct HashString {
760    pub hash: String,
761}
762
763impl HashString {
764    fn from_hash(hash: String) -> Self {
765        HashString { hash }
766    }
767}
768
769impl<'de> Deserialize<'de> for HashString {
770    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
771    where
772        D: Deserializer<'de>,
773    {
774        let map: std::collections::HashMap<String, String> =
775            Deserialize::deserialize(deserializer)?;
776
777        if let Some(hash) = map.get("Version1").or_else(|| map.get("Deploy")) {
778            Ok(HashString::from_hash(hash.clone()))
779        } else {
780            Err(serde::de::Error::missing_field("Deploy or Version1"))
781        }
782    }
783}
784
785#[wasm_bindgen]
786impl HashString {
787    #[wasm_bindgen(getter, js_name = "Deploy")]
788    pub fn deploy(&self) -> String {
789        self.hash.clone()
790    }
791
792    #[wasm_bindgen(getter, js_name = "Version1")]
793    pub fn version1(&self) -> String {
794        self.hash.clone()
795    }
796
797    #[wasm_bindgen(js_name = "toString")]
798    pub fn to_string_js(&self) -> String {
799        self.to_string()
800    }
801}
802
803impl fmt::Display for HashString {
804    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
805        write!(f, "{}", self.hash)
806    }
807}
808
809#[derive(Debug, Deserialize, Clone, Serialize, Default)]
810#[wasm_bindgen(getter_with_clone)]
811pub struct PublicKeyString {
812    #[serde(rename = "PublicKey")]
813    #[wasm_bindgen(js_name = "PublicKey")]
814    pub public_key: String,
815}
816
817#[derive(Debug, Deserialize, Clone, Serialize, Default)]
818#[wasm_bindgen(getter_with_clone)]
819pub struct Message {
820    #[serde(rename = "String")]
821    #[wasm_bindgen(js_name = "String")]
822    pub string: String,
823}
824
825#[derive(Debug, Deserialize, Clone, Serialize, Default)]
826#[wasm_bindgen(getter_with_clone)]
827pub struct Messages {
828    pub entity_hash: String,
829    pub message: Message,
830    pub topic_name: String,
831    pub topic_name_hash: String,
832    pub topic_index: u32,
833    pub block_index: u64,
834}
835
836/// Represents processed deploy information.
837#[derive(Debug, Deserialize, Clone, Default, Serialize)]
838#[wasm_bindgen(getter_with_clone)]
839pub struct TransactionProcessed {
840    #[serde(alias = "transaction_hash")]
841    pub hash: HashString,
842    pub initiator_addr: PublicKeyString,
843    pub timestamp: String,
844    pub ttl: String,
845    pub block_hash: String,
846    /// Result of the execution, either Success or Failure.
847    pub execution_result: ExecutionResult,
848    pub messages: Vec<Messages>,
849}
850
851/// Represents the body of an event, containing processed deploy information.
852#[derive(Debug, Deserialize, Clone, Default, Serialize)]
853#[wasm_bindgen(getter_with_clone)]
854pub struct Body {
855    #[serde(rename = "TransactionProcessed")]
856    pub transaction_processed: Option<TransactionProcessed>,
857}
858
859// Implementing methods to get the field using different aliases
860#[wasm_bindgen]
861impl Body {
862    #[wasm_bindgen(getter, js_name = "get_deploy_processed")]
863    #[deprecated(note = "prefer 'get_transaction_processed'")]
864    #[allow(deprecated)]
865    pub fn get_deploy_processed(&self) -> Option<TransactionProcessed> {
866        self.transaction_processed.clone()
867    }
868
869    #[wasm_bindgen(getter, js_name = "get_transaction_processed")]
870    pub fn get_transaction_processed(&self) -> Option<TransactionProcessed> {
871        self.transaction_processed.clone()
872    }
873}
874
875/// Represents the result of parsing an event, containing error information and the event body.
876#[derive(Debug, Deserialize, Clone, Default, Serialize)]
877#[wasm_bindgen(getter_with_clone)]
878pub struct EventParseResult {
879    pub err: Option<String>,
880    pub body: Option<Body>,
881}
882
883/// Enum representing different event names.
884#[derive(Debug, Deserialize, Clone, Serialize)]
885enum EventName {
886    BlockAdded,
887    TransactionAccepted,
888    TransactionExpired,
889    TransactionProcessed,
890    Step,
891    FinalitySignature,
892    Fault,
893}
894
895impl fmt::Display for EventName {
896    /// Implements the `fmt::Display` trait for converting the enum variant to its string representation.
897    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
898        match self {
899            EventName::BlockAdded => write!(f, "BlockAdded"),
900            EventName::TransactionAccepted => write!(f, "TransactionAccepted"),
901            EventName::TransactionExpired => write!(f, "TransactionExpired"),
902            EventName::TransactionProcessed => write!(f, "TransactionProcessed"),
903            EventName::Step => write!(f, "Step"),
904            EventName::FinalitySignature => write!(f, "FinalitySignature"),
905            EventName::Fault => write!(f, "Fault"),
906        }
907    }
908}
909
910#[cfg(test)]
911mod tests {
912    use super::*;
913    use crate::watcher::{deploy_mock::DEPLOY_MOCK, transaction_mock::TRANSACTION_MOCK};
914    use sdk_tests::tests::helpers::get_network_constants;
915    use tokio;
916
917    #[test]
918    fn test_new() {
919        // Arrange
920        let (_, events_url, _, _, _) = get_network_constants();
921        let timeout_duration = 5000;
922
923        // Act
924        let watcher = Watcher::new(events_url.clone(), Some(timeout_duration));
925
926        // Assert
927        assert_eq!(watcher.events_url, events_url);
928        assert_eq!(watcher.subscriptions.len(), 0);
929        assert!(*watcher.active.lock().unwrap());
930        assert_eq!(
931            watcher.timeout_duration,
932            Duration::try_milliseconds(timeout_duration.try_into().unwrap()).unwrap()
933        );
934    }
935
936    #[test]
937    fn test_new_default_timeout() {
938        // Arrange
939        let (_, events_url, _, _, _) = get_network_constants();
940
941        // Act
942        let watcher = Watcher::new(events_url.clone(), None);
943
944        // Assert
945        assert_eq!(watcher.events_url, events_url);
946        assert_eq!(watcher.subscriptions.len(), 0);
947        assert!(*watcher.active.lock().unwrap());
948        assert_eq!(
949            watcher.timeout_duration,
950            Duration::try_milliseconds(DEFAULT_TIMEOUT_MS.try_into().unwrap()).unwrap()
951        );
952    }
953
954    #[tokio::test]
955    async fn test_extract_data_stream() {
956        // Arrange
957        let json_data = r#"data:segment1id:data:segment2id:data:segment3id:"#;
958
959        // Act
960        let result = Watcher::extract_data_stream(json_data);
961
962        // Assert
963        assert_eq!(result, vec!["segment1", "segment2", "segment3"]);
964    }
965
966    #[tokio::test]
967    #[allow(deprecated)]
968    async fn test_process_events_legacy() {
969        // Arrange
970        let (_, events_url, _, _, _) = get_network_constants();
971        let watcher = Watcher::new(events_url, None);
972        let deploy_hash = "19dbf9bdcd821e55392393c74c86deede02d9434d62d0bc72ab381ce7ea1c4f2";
973
974        let target_deploy_hash = Some(deploy_hash);
975
976        // Act
977        let result = watcher.process_events(DEPLOY_MOCK, target_deploy_hash);
978
979        // Assert
980        assert!(result.is_some());
981        let results = result.unwrap();
982        assert_eq!(results.len(), 1);
983
984        let event_parse_result = &results[0];
985        assert!(event_parse_result.err.is_none());
986
987        let body = event_parse_result.body.as_ref().unwrap();
988        let get_deploy_processed = body.get_deploy_processed().unwrap();
989        assert_eq!(get_deploy_processed.hash.to_string(), deploy_hash);
990    }
991
992    #[tokio::test]
993    async fn test_process_events() {
994        // Arrange
995        let (_, events_url, _, _, _) = get_network_constants();
996        let watcher = Watcher::new(events_url, None);
997        let transaction_hash = "8c6823d9480eee9fe0cfb5ed1fbf77f928cc6af21121298c05b4e3d87a328271";
998
999        let target_transaction_hash = Some(transaction_hash);
1000
1001        // Act
1002        let result = watcher.process_events(TRANSACTION_MOCK, target_transaction_hash);
1003
1004        // Assert
1005        assert!(result.is_some());
1006        let results = result.unwrap();
1007        assert_eq!(results.len(), 1);
1008
1009        let event_parse_result = &results[0];
1010        assert!(event_parse_result.err.is_none());
1011
1012        let body = event_parse_result.body.as_ref().unwrap();
1013        let transaction_processed = body.get_transaction_processed().unwrap();
1014        assert_eq!(transaction_processed.hash.to_string(), transaction_hash);
1015    }
1016
1017    #[tokio::test]
1018    async fn test_start_timeout() {
1019        // Arrange
1020        let (_, events_url, _, _, _) = get_network_constants();
1021        let watcher = Watcher::new(events_url, Some(1));
1022
1023        // Act
1024        let result = watcher.start().await;
1025
1026        // Assert
1027        assert!(result.is_some());
1028        let results = result.unwrap();
1029        assert_eq!(results.len(), 1);
1030        assert_eq!(results[0].err, Some("Timeout expired".to_string()));
1031        assert!(results[0].body.is_none());
1032    }
1033
1034    #[test]
1035    fn test_stop() {
1036        // Arrange
1037        let (_, events_url, _, _, _) = get_network_constants();
1038        let watcher = Watcher::new(events_url, None);
1039        assert!(*watcher.active.lock().unwrap());
1040
1041        // Act
1042        watcher.stop();
1043
1044        // Assert
1045        assert!(!(*watcher.active.lock().unwrap()));
1046    }
1047
1048    #[test]
1049    fn test_subscribe() {
1050        // Arrange
1051        let (_, events_url, _, _, _) = get_network_constants();
1052        let mut watcher = Watcher::new(events_url, None);
1053        let transaction_hash = "8c6823d9480eee9fe0cfb5ed1fbf77f928cc6af21121298c05b4e3d87a328271";
1054
1055        // Create a subscription
1056        let subscription =
1057            Subscription::new(transaction_hash.to_string(), EventHandlerFn::default());
1058
1059        // Act
1060        let result = watcher.subscribe(vec![subscription]);
1061
1062        // Assert
1063        assert!(result.is_ok());
1064
1065        // Try subscribing to the same deploy hash again
1066        let duplicate_subscription =
1067            Subscription::new(transaction_hash.to_string(), EventHandlerFn::default());
1068        let result_duplicate = watcher.subscribe(vec![duplicate_subscription]);
1069
1070        // Assert
1071        assert!(result_duplicate.is_err());
1072        assert_eq!(
1073            result_duplicate.err().unwrap(),
1074            "Already subscribed to this event"
1075        );
1076    }
1077
1078    #[test]
1079    fn test_unsubscribe() {
1080        // Arrange
1081        let (_, events_url, _, _, _) = get_network_constants();
1082        let mut watcher = Watcher::new(events_url, None);
1083        let transaction_hash = "8c6823d9480eee9fe0cfb5ed1fbf77f928cc6af21121298c05b4e3d87a328271";
1084
1085        // Subscribe to a transaction hash
1086        let transaction_hash_to_subscribe = transaction_hash.to_string();
1087        let subscription = Subscription::new(
1088            transaction_hash_to_subscribe.clone(),
1089            EventHandlerFn::default(),
1090        );
1091        let _ = watcher.subscribe(vec![subscription]);
1092
1093        // Assert that the deploy hash is initially subscribed
1094        assert!(watcher
1095            .subscriptions
1096            .iter()
1097            .any(|s| s.target_hash == transaction_hash_to_subscribe));
1098
1099        // Act
1100        watcher.unsubscribe(transaction_hash_to_subscribe.clone());
1101
1102        // Assert that the deploy hash is unsubscribed after calling unsubscribe
1103        assert!(!watcher
1104            .subscriptions
1105            .iter()
1106            .any(|s| s.target_hash == transaction_hash_to_subscribe));
1107    }
1108
1109    #[test]
1110    #[allow(deprecated)]
1111    fn test_sdk_watch_deploy_retunrs_instance() {
1112        // Arrange
1113        let sdk = SDK::new(None, None, None);
1114        let (_, events_url, _, _, _) = get_network_constants();
1115        let timeout_duration = 5000;
1116
1117        // Act
1118        let watcher = sdk.watch_deploy(&events_url, Some(timeout_duration));
1119
1120        // Assert
1121        assert_eq!(watcher.events_url, events_url);
1122        assert_eq!(watcher.subscriptions.len(), 0);
1123        assert!(*watcher.active.lock().unwrap());
1124        assert_eq!(
1125            watcher.timeout_duration,
1126            Duration::try_milliseconds(timeout_duration.try_into().unwrap()).unwrap()
1127        );
1128    }
1129
1130    #[test]
1131    fn test_sdk_watch_transaction_retunrs_instance() {
1132        // Arrange
1133        let sdk = SDK::new(None, None, None);
1134        let (_, events_url, _, _, _) = get_network_constants();
1135        let timeout_duration = 5000;
1136
1137        // Act
1138        let watcher = sdk.watch_transaction(&events_url, Some(timeout_duration));
1139
1140        // Assert
1141        assert_eq!(watcher.events_url, events_url);
1142        assert_eq!(watcher.subscriptions.len(), 0);
1143        assert!(*watcher.active.lock().unwrap());
1144        assert_eq!(
1145            watcher.timeout_duration,
1146            Duration::try_milliseconds(timeout_duration.try_into().unwrap()).unwrap()
1147        );
1148    }
1149
1150    #[tokio::test]
1151    #[allow(deprecated)]
1152    async fn test_wait_deploy_timeout() {
1153        // Arrange
1154        let sdk = SDK::new(None, None, None);
1155        let (_, events_url, _, _, _) = get_network_constants();
1156        let deploy_hash = "19dbf9bdcd821e55392393c74c86deede02d9434d62d0bc72ab381ce7ea1c4f2";
1157        let timeout_duration = Some(5000);
1158
1159        // Act
1160        let result = sdk
1161            .wait_deploy(&events_url, deploy_hash, timeout_duration)
1162            .await;
1163
1164        // Assert
1165        assert!(result.is_ok());
1166        let event_parse_result = result.unwrap();
1167        assert!(event_parse_result.err.is_some());
1168        assert_eq!(event_parse_result.err, Some("Timeout expired".to_string()));
1169    }
1170
1171    #[tokio::test]
1172    async fn test_wait_transaction_timeout() {
1173        // Arrange
1174        let sdk = SDK::new(None, None, None);
1175        let (_, events_url, _, _, _) = get_network_constants();
1176        let transaction_hash = "8c6823d9480eee9fe0cfb5ed1fbf77f928cc6af21121298c05b4e3d87a328271";
1177        let timeout_duration = Some(5000);
1178
1179        // Act
1180        let result = sdk
1181            .wait_transaction(&events_url, transaction_hash, timeout_duration)
1182            .await;
1183
1184        // Assert
1185        assert!(result.is_ok());
1186        let event_parse_result = result.unwrap();
1187        assert!(event_parse_result.err.is_some());
1188        assert_eq!(event_parse_result.err, Some("Timeout expired".to_string()));
1189    }
1190}