casper_rust_wasm_sdk/sdk/watcher/
watcher.rs

1use crate::SDK;
2use chrono::{Duration, Utc};
3use futures_util::StreamExt;
4#[cfg(target_arch = "wasm32")]
5use gloo_utils::format::JsValueSerdeExt;
6#[cfg(target_arch = "wasm32")]
7use js_sys::Promise;
8use serde::{Deserialize, Deserializer, Serialize};
9use serde_json::Value;
10use std::{
11    cell::RefCell,
12    fmt,
13    rc::Rc,
14    sync::{Arc, Mutex},
15};
16use wasm_bindgen::prelude::*;
17#[cfg(target_arch = "wasm32")]
18use wasm_bindgen_futures::future_to_promise;
19
20const DEFAULT_TIMEOUT_MS: u64 = 60000;
21
22impl SDK {
23    /// Creates a new Watcher instance to watch deploys.
24    /// Legacy alias
25    ///
26    /// # Arguments
27    ///
28    /// * `events_url` - The URL to monitor for transaction events.
29    /// * `timeout_duration` - An optional timeout duration in seconds.
30    ///
31    /// # Returns
32    ///
33    /// A `Watcher` instance.
34    #[deprecated(note = "prefer 'watch_transaction'")]
35    pub fn watch_deploy(&self, events_url: &str, timeout_duration: Option<u64>) -> Watcher {
36        Watcher::new(events_url.to_string(), timeout_duration)
37    }
38
39    /// Creates a new Watcher instance to watch deploys.
40    ///
41    /// # Arguments
42    ///
43    /// * `events_url` - The URL to monitor for transaction events.
44    /// * `timeout_duration` - An optional timeout duration in seconds.
45    ///
46    /// # Returns
47    ///
48    /// A `Watcher` instance.
49    pub fn watch_transaction(&self, events_url: &str, timeout_duration: Option<u64>) -> Watcher {
50        Watcher::new(events_url.to_string(), timeout_duration)
51    }
52
53    /// Waits for a deploy event to be processed asynchronously.
54    /// Legacy alias
55    ///
56    /// # Arguments
57    ///
58    /// * `events_url` - The URL to monitor for transaction events.
59    /// * `deploy_hash` - The deploy hash to wait for.
60    /// * `timeout_duration` - An optional timeout duration in milliseconds.
61    ///
62    /// # Returns
63    ///
64    /// A `Result` containing either the processed `EventParseResult` or an error message.
65    #[deprecated(note = "prefer 'wait_transaction' with transaction")]
66    pub async fn wait_deploy(
67        &self,
68        events_url: &str,
69        deploy_hash: &str,
70        timeout_duration: Option<u64>,
71    ) -> Result<EventParseResult, String> {
72        Self::wait_transaction_internal(
73            events_url.to_string(),
74            deploy_hash.to_string(),
75            timeout_duration,
76        )
77        .await
78    }
79
80    /// Alias for wait_deploy Waits for a deploy event to be processed asynchronously.
81    ///
82    /// # Arguments
83    ///
84    /// * `events_url` - The URL to monitor for transaction events.
85    /// * `target_hash` - The transaction hash to wait for.
86    /// * `timeout_duration` - An optional timeout duration in milliseconds.
87    ///
88    /// # Returns
89    ///
90    /// A `Result` containing either the processed `EventParseResult` or an error message
91    pub async fn wait_transaction(
92        &self,
93        events_url: &str,
94        target_hash: &str,
95        timeout_duration: Option<u64>,
96    ) -> Result<EventParseResult, String> {
97        Self::wait_transaction_internal(
98            events_url.to_string(),
99            target_hash.to_string(),
100            timeout_duration,
101        )
102        .await
103    }
104
105    /// Internal function to wait for a deploy event.
106    ///
107    /// # Arguments
108    ///
109    /// * `events_url` - The URL to monitor for transaction events.
110    /// * `target_hash` - The transaction hash to wait for.
111    /// * `timeout_duration` - An optional timeout duration in milliseconds.
112    ///
113    /// # Returns
114    ///
115    /// A `Result` containing either the processed `EventParseResult` or an error message.
116    async fn wait_transaction_internal(
117        events_url: String,
118        target_hash: String,
119        timeout_duration: Option<u64>,
120    ) -> Result<EventParseResult, String> {
121        let watcher = Watcher::new(events_url, timeout_duration);
122        let result = watcher.start_internal(Some(target_hash)).await;
123        match result {
124            Some(event_parse_results) => {
125                if let Some(event_parse_result) = event_parse_results.first() {
126                    return Ok(event_parse_result.clone());
127                }
128                Err("No first event result".to_string())
129            }
130            None => Err("No event result found".to_string()),
131        }
132    }
133}
134
135#[wasm_bindgen]
136impl SDK {
137    /// Creates a new Watcher instance to watch deploys (JavaScript-friendly).
138    /// Legacy alias
139    ///
140    /// # Arguments
141    ///
142    /// * `events_url` - The URL to monitor for transaction events.
143    /// * `timeout_duration` - An optional timeout duration in seconds.
144    ///
145    /// # Returns
146    ///
147    /// A `Watcher` instance.
148    #[cfg(target_arch = "wasm32")]
149    #[wasm_bindgen(js_name = "watchDeploy")]
150    #[deprecated(note = "prefer 'watchTransaction'")]
151    #[allow(deprecated)]
152    pub fn watch_deploy_js_alias(
153        &self,
154        events_url: &str,
155        timeout_duration: Option<u32>,
156    ) -> Watcher {
157        self.watch_deploy(events_url, timeout_duration.map(Into::into))
158    }
159
160    /// Creates a new Watcher instance to watch deploys (JavaScript-friendly).
161    ///
162    /// # Arguments
163    ///
164    /// * `events_url` - The URL to monitor for transaction events.
165    /// * `timeout_duration` - An optional timeout duration in seconds.
166    ///
167    /// # Returns
168    ///
169    /// A `Watcher` instance.
170    #[cfg(target_arch = "wasm32")]
171    #[wasm_bindgen(js_name = "watchTransaction")]
172    pub fn watch_transaction_js_alias(
173        &self,
174        events_url: &str,
175        timeout_duration: Option<u32>,
176    ) -> Watcher {
177        self.watch_transaction(events_url, timeout_duration.map(Into::into))
178    }
179
180    /// Waits for a deploy event to be processed asynchronously (JavaScript-friendly).
181    /// Legacy alias
182    ///
183    /// # Arguments
184    ///
185    /// * `events_url` - The URL to monitor for transaction events.
186    /// * `deploy_hash` - The deploy hash to wait for.
187    /// * `timeout_duration` - An optional timeout duration in seconds.
188    ///
189    /// # Returns
190    ///
191    /// A JavaScript `Promise` resolving to either the processed `EventParseResult` or an error message.
192    #[cfg(target_arch = "wasm32")]
193    #[wasm_bindgen(js_name = "waitDeploy")]
194    #[deprecated(note = "prefer 'waitTransaction' with transaction")]
195    #[allow(deprecated)]
196    pub async fn wait_deploy_js_alias(
197        &self,
198        events_url: &str,
199        deploy_hash: &str,
200        timeout_duration: Option<u32>,
201    ) -> Promise {
202        self.wait_transaction_js_alias(events_url, deploy_hash, timeout_duration)
203            .await
204    }
205
206    /// Waits for a deploy event to be processed asynchronously (JavaScript-friendly).
207    ///
208    /// # Arguments
209    ///
210    /// * `events_url` - The URL to monitor for transaction events.
211    /// * `target_hash` - The transaction hash to wait for.
212    /// * `timeout_duration` - An optional timeout duration in seconds.
213    ///
214    /// # Returns
215    ///
216    /// A JavaScript `Promise` resolving to either the processed `EventParseResult` or an error message.
217    #[cfg(target_arch = "wasm32")]
218    #[wasm_bindgen(js_name = "waitTransaction")]
219    pub async fn wait_transaction_js_alias(
220        &self,
221        events_url: &str,
222        target_hash: &str,
223        timeout_duration: Option<u32>,
224    ) -> Promise {
225        let events_url = events_url.to_string();
226        let target_hash = target_hash.to_string();
227        let future = async move {
228            let result = Self::wait_transaction_internal(
229                events_url,
230                target_hash,
231                timeout_duration.map(Into::into),
232            )
233            .await;
234            match result {
235                Ok(event_parse_result) => JsValue::from_serde(&event_parse_result)
236                    .map_err(|err| JsValue::from_str(&format!("{err}"))),
237                Err(err) => Err(JsValue::from_str(&err)),
238            }
239        };
240
241        future_to_promise(future)
242    }
243}
244
245/// Represents a deploy watcher responsible for monitoring transaction events.
246///
247/// This struct allows clients to subscribe to transaction events, start watching for events,
248/// or wait for an event and handle the received deploy event data.
249///
250/// # Fields
251///
252/// * `events_url` - The URL for transaction events.
253/// * `subscriptions` - Vector containing deploy subscriptions.
254/// * `active` - Reference-counted cell indicating whether the deploy watcher is active.
255/// * `timeout_duration` - Duration representing the optional timeout for watching events.
256#[derive(Clone)]
257#[wasm_bindgen]
258pub struct Watcher {
259    events_url: String,
260    subscriptions: Vec<Subscription>,
261    active: Rc<RefCell<bool>>,
262    timeout_duration: Duration,
263}
264
265#[wasm_bindgen]
266impl Watcher {
267    /// Creates a new `Watcher` instance.
268    ///
269    /// # Arguments
270    ///
271    /// * `events_url` - The URL for transaction events.
272    /// * `timeout_duration` - Optional duration in milliseconds for watching events. If not provided,
273    ///   a default timeout of 60,000 milliseconds (1 minute) is used.
274    ///
275    /// # Returns
276    ///
277    /// A new `Watcher` instance.
278    #[wasm_bindgen(constructor)]
279    pub fn new(events_url: String, timeout_duration: Option<u64>) -> Self {
280        let timeout_duration = Duration::try_milliseconds(
281            timeout_duration
282                .unwrap_or(DEFAULT_TIMEOUT_MS)
283                .try_into()
284                .unwrap(),
285        )
286        .unwrap_or_default();
287
288        Watcher {
289            events_url,
290            subscriptions: Vec::new(),
291            active: Rc::new(RefCell::new(true)),
292            timeout_duration,
293        }
294    }
295
296    /// Subscribes to transaction events.
297    ///
298    /// # Arguments
299    ///
300    /// * `subscriptions` - Vector of deploy subscriptions to be added.
301    ///
302    /// # Returns
303    ///
304    /// Result indicating success or an error message.
305    #[cfg(target_arch = "wasm32")]
306    #[wasm_bindgen(js_name = "subscribe")]
307    pub fn subscribe_js_alias(&mut self, subscriptions: Vec<Subscription>) -> Result<(), String> {
308        self.subscribe(subscriptions)
309    }
310
311    /// Unsubscribes from transaction events based on the provided transaction hash.
312    ///
313    /// # Arguments
314    ///
315    /// * `transaction_hash` - The transaction hash to unsubscribe.
316    ///
317    /// This method removes the deploy subscription associated with the provided transaction hash.
318    #[wasm_bindgen]
319    pub fn unsubscribe(&mut self, target_hash: String) {
320        self.subscriptions.retain(|s| s.target_hash != target_hash);
321    }
322
323    /// Starts watching for transaction events (JavaScript-friendly).
324    ///
325    /// # Returns
326    ///
327    /// Result containing the serialized transaction events data or an error message.
328    #[cfg(target_arch = "wasm32")]
329    #[wasm_bindgen(js_name = "start")]
330    pub async fn start_js_alias(&self) -> Result<JsValue, JsError> {
331        let result = match self.start_internal(None).await {
332            Some(res) => res,
333            None => return Ok(JsValue::NULL),
334        };
335
336        let serialized = JsValue::from_serde(&result)
337            .map_err(|err| JsError::new(&format!("Error serializing events: {:?}", err)))?;
338
339        Ok(serialized)
340    }
341
342    /// Stops watching for transaction events.
343    ///
344    /// This method sets the deploy watcher as inactive and stops the event listener if it exists.
345    #[wasm_bindgen]
346    pub fn stop(&self) {
347        *self.active.borrow_mut() = false;
348    }
349}
350
351impl Watcher {
352    /// Asynchronously starts watching for transaction events and execute callback handler functions from deploy subscriptions
353    ///
354    /// # Returns
355    ///
356    /// An `Option` containing the serialized deploy event data or `None` if no events are received.
357    pub async fn start(&self) -> Option<Vec<EventParseResult>> {
358        self.start_internal(None).await
359    }
360
361    /// Asynchronously starts watching for transaction events
362    ///
363    /// # Arguments
364    ///
365    /// * `transaction_hash` - Optional transaction hash to directly return processed event. If provided, it directly returns matched events without executing callback handler functions from deploy subscriptions. If `None`, it executes callback handler functions from deploy subscriptions.
366    ///
367    /// # Returns
368    ///
369    /// An `Option` containing the serialized deploy event data or `None` if no events are received.
370    async fn start_internal(&self, target_hash: Option<String>) -> Option<Vec<EventParseResult>> {
371        *self.active.borrow_mut() = true;
372
373        let client = reqwest::Client::new();
374        let url = self.events_url.clone();
375
376        let watcher = Rc::new(RefCell::new(self.clone()));
377
378        let start_time = Utc::now();
379        let timeout_duration = self.timeout_duration;
380
381        let response = match client.get(&url).send().await {
382            Ok(res) => res,
383            Err(err) => {
384                let err = err.to_string();
385                let event_parse_result = EventParseResult {
386                    err: Some(err.to_string()),
387                    body: None,
388                };
389                return Some([event_parse_result].to_vec());
390            }
391        };
392
393        if response.status().is_success() {
394            let buffer_size = 1;
395            let mut buffer = Vec::with_capacity(buffer_size);
396
397            let mut bytes_stream = response.bytes_stream();
398            while let Some(chunk) = bytes_stream.next().await {
399                match chunk {
400                    Ok(bytes) => {
401                        let this_clone = Rc::clone(&watcher);
402                        if !*this_clone.borrow_mut().active.borrow() {
403                            return None;
404                        }
405
406                        if Utc::now() - start_time >= timeout_duration {
407                            let event_parse_result = EventParseResult {
408                                err: Some("Timeout expired".to_string()),
409                                body: None,
410                            };
411                            return Some([event_parse_result].to_vec());
412                        }
413
414                        buffer.extend_from_slice(&bytes);
415
416                        while let Some(index) = buffer.iter().position(|&b| b == b'\n') {
417                            let message = buffer.drain(..=index).collect::<Vec<_>>();
418
419                            if let Ok(message) = std::str::from_utf8(&message) {
420                                let watcher_clone = this_clone.borrow_mut().clone();
421                                let result =
422                                    watcher_clone.process_events(message, target_hash.as_deref());
423                                match result {
424                                    Some(event_parse_result) => return Some(event_parse_result),
425                                    None => {
426                                        continue;
427                                    }
428                                };
429                            } else {
430                                let event_parse_result = EventParseResult {
431                                    err: Some("Error decoding UTF-8 data".to_string()),
432                                    body: None,
433                                };
434                                return Some([event_parse_result].to_vec());
435                            }
436                        }
437                    }
438                    Err(err) => {
439                        let event_parse_result = EventParseResult {
440                            err: Some(format!("Error reading chunk: {}", err)),
441                            body: None,
442                        };
443                        return Some([event_parse_result].to_vec());
444                    }
445                }
446            }
447        } else {
448            let event_parse_result = EventParseResult {
449                err: Some("Failed to fetch stream".to_string()),
450                body: None,
451            };
452            return Some([event_parse_result].to_vec());
453        }
454        None
455    }
456
457    /// Subscribes to transaction events.
458    ///
459    /// # Arguments
460    ///
461    /// * `subscriptions` - Vector of subscriptions to be added.
462    ///
463    /// # Returns
464    ///
465    /// Result indicating success or an error message.
466    pub fn subscribe(&mut self, subscriptions: Vec<Subscription>) -> Result<(), String> {
467        for new_subscription in &subscriptions {
468            if self
469                .subscriptions
470                .iter()
471                .any(|s| s.target_hash == new_subscription.target_hash)
472            {
473                return Err(String::from("Already subscribed to this event"));
474            }
475        }
476        self.subscriptions.extend(subscriptions);
477        Ok(())
478    }
479
480    /// Processes events received from the stream and notifies subscribers.
481    ///
482    /// # Arguments
483    ///
484    /// * `message` - The raw message received from the event stream.
485    /// * `target_transaction_hash` - Optional transaction hash to directly return. If provided, it directly returns matched events without executing callback handler functions from subscriptions. If `None`, it executes callback handler functions from subscriptions.
486    ///
487    /// # Returns
488    ///
489    /// An `Option` containing the serialized transaction/deploy event data or `None` if an error occurs.
490    fn process_events(
491        mut self,
492        message: &str,
493        target_hash: Option<&str>,
494    ) -> Option<Vec<EventParseResult>> {
495        let data_stream = Self::extract_data_stream(message);
496
497        for data_item in data_stream {
498            let trimmed_item = data_item.trim();
499            let transaction_processed_str = EventName::TransactionProcessed.to_string();
500
501            if !trimmed_item.contains(&transaction_processed_str) {
502                continue;
503            }
504
505            if let Ok(parsed_json) = serde_json::from_str::<Value>(trimmed_item) {
506                let transaction = parsed_json.get(transaction_processed_str);
507                if let Some(transaction_processed) =
508                    transaction.and_then(|transaction| transaction.as_object())
509                {
510                    if let Some(transaction_hash_processed) = transaction_processed
511                        .get("transaction_hash")
512                        .and_then(|transaction_hash| {
513                            transaction_hash
514                                .get("Version1")
515                                .or_else(|| transaction_hash.get("Deploy"))
516                                .and_then(|transaction_hash| transaction_hash.as_str())
517                        })
518                    {
519                        let mut transaction_hash_found =
520                            target_hash == Some(transaction_hash_processed);
521
522                        let transaction_processed: Option<TransactionProcessed> =
523                            serde_json::from_value(transaction.unwrap().clone()).ok();
524
525                        let body = Some(Body {
526                            transaction_processed,
527                        });
528
529                        let event_parse_result = EventParseResult { err: None, body };
530
531                        if transaction_hash_found {
532                            self.unsubscribe(target_hash.unwrap().to_string());
533                            self.stop();
534                            return Some([event_parse_result].to_vec());
535                        }
536
537                        let mut results: Vec<EventParseResult> = [].to_vec();
538                        for subscription in self.subscriptions.clone().iter() {
539                            if transaction_hash_processed == subscription.target_hash {
540                                let event_handler = &subscription.event_handler_fn;
541
542                                #[cfg(not(target_arch = "wasm32"))]
543                                {
544                                    event_handler.call(event_parse_result.clone());
545                                }
546                                #[cfg(target_arch = "wasm32")]
547                                {
548                                    let this = JsValue::null();
549                                    let args = js_sys::Array::new();
550                                    args.push(
551                                        &JsValue::from_serde(&event_parse_result.clone()).unwrap(),
552                                    );
553                                    event_handler.apply(&this, &args).unwrap();
554                                }
555
556                                self.unsubscribe(transaction_hash_processed.to_string());
557                                transaction_hash_found = true;
558                                results.push(event_parse_result.clone())
559                            }
560                        }
561
562                        if transaction_hash_found && self.subscriptions.is_empty() {
563                            self.stop();
564                            return Some(results);
565                        }
566                    }
567                }
568            } else {
569                let event_parse_result = EventParseResult {
570                    err: Some("Failed to parse JSON data.".to_string()),
571                    body: None,
572                };
573                return Some([event_parse_result].to_vec());
574            }
575        }
576        None
577    }
578
579    /// Extracts the data stream from the raw JSON data.
580    ///
581    /// # Arguments
582    ///
583    /// * `json_data` - The raw JSON data containing the data stream.
584    ///
585    /// # Returns
586    ///
587    /// A vector of data items within the data stream.
588    fn extract_data_stream(json_data: &str) -> Vec<&str> {
589        let data_stream: Vec<&str> = json_data
590            .split("data:")
591            .filter(|s| !s.is_empty())
592            .map(|s| s.split("id:").next().unwrap_or(""))
593            .collect();
594        data_stream
595    }
596}
597
598/// A wrapper for an event handler function, providing synchronization and cloning capabilities.
599pub struct EventHandlerFn(Arc<Mutex<dyn Fn(EventParseResult) + Send + Sync>>);
600
601#[allow(dead_code)]
602impl EventHandlerFn {
603    /// Creates a new `EventHandlerFn` with the specified event handling function.
604    ///
605    /// # Arguments
606    ///
607    /// * `func` - A function that takes an `EventParseResult` as an argument.
608    ///
609    /// # Returns
610    ///
611    /// A new `EventHandlerFn` instance.
612    pub fn new<F>(func: F) -> Self
613    where
614        F: Fn(EventParseResult) + Send + Sync + 'static,
615    {
616        EventHandlerFn(Arc::new(Mutex::new(func)))
617    }
618
619    /// Calls the stored event handling function with the provided `EventParseResult`.
620    ///
621    /// # Arguments
622    ///
623    /// * `event_result` - The result of an event to be passed to the stored event handling function.
624    pub fn call(&self, event_result: EventParseResult) {
625        let func = self.0.lock().unwrap();
626        (*func)(event_result); // Call the stored function with arguments
627    }
628}
629
630impl fmt::Debug for EventHandlerFn {
631    /// Implements the `Debug` trait for better debugging support.
632    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
633        write!(f, "EventHandlerFn")
634    }
635}
636
637impl Clone for EventHandlerFn {
638    /// Implements the `Clone` trait for creating a cloned instance with shared underlying data.
639    fn clone(&self) -> Self {
640        EventHandlerFn(self.0.clone())
641    }
642}
643
644impl Default for EventHandlerFn {
645    /// Implements the `Default` trait, creating a default instance with a no-op event handling function.
646    fn default() -> Self {
647        EventHandlerFn(Arc::new(Mutex::new(|_event_result| {})))
648    }
649}
650
651// Define Subscription struct with different configurations based on the target architecture.
652#[cfg(not(target_arch = "wasm32"))]
653/// Represents a subscription to transaction events for non-wasm32 target architecture.
654#[derive(Debug, Clone, Default)]
655pub struct Subscription {
656    /// Transaction target hash to identify the subscription.
657    pub target_hash: String,
658    /// Handler function for transaction events.
659    pub event_handler_fn: EventHandlerFn,
660}
661
662#[cfg(target_arch = "wasm32")]
663/// Represents a subscription to transaction events for wasm32 target architecture.
664#[derive(Debug, Clone, Default)]
665#[wasm_bindgen(getter_with_clone)]
666pub struct Subscription {
667    /// Transaction target hash to identify the subscription.
668    #[wasm_bindgen(js_name = "targetHash")]
669    pub target_hash: String,
670    /// Handler function for transaction events.
671    #[wasm_bindgen(js_name = "eventHandlerFn")]
672    pub event_handler_fn: js_sys::Function,
673}
674
675impl Subscription {
676    /// Constructor for Subscription for non-wasm32 target architecture.
677    ///
678    /// # Arguments
679    ///
680    /// * `target_hash` - Transaction target hash to identify the subscription.
681    /// * `event_handler_fn` - Handler function for transaction events.
682    #[cfg(not(target_arch = "wasm32"))]
683    pub fn new(target_hash: String, event_handler_fn: EventHandlerFn) -> Self {
684        Self {
685            target_hash,
686            event_handler_fn,
687        }
688    }
689}
690
691#[wasm_bindgen]
692impl Subscription {
693    /// Constructor for Subscription for wasm32 target architecture.
694    ///
695    /// # Arguments
696    ///
697    /// * `transaction_hash` - Transaction hash to identify the subscription.
698    /// * `event_handler_fn` - Handler function for transaction events.
699    #[cfg(target_arch = "wasm32")]
700    #[wasm_bindgen(constructor)]
701    pub fn new(target_hash: String, event_handler_fn: js_sys::Function) -> Self {
702        Self {
703            target_hash,
704            event_handler_fn,
705        }
706    }
707}
708
709/// Represents a failure response containing an error message.
710#[derive(Debug, Deserialize, Clone, Default, Serialize)]
711#[wasm_bindgen(getter_with_clone)]
712pub struct Failure {
713    pub cost: String,
714    pub error_message: String,
715}
716
717/// Represents a success response containing a cost value.
718#[derive(Debug, Deserialize, Clone, Default, Serialize)]
719#[wasm_bindgen(getter_with_clone)]
720pub struct Version2 {
721    pub initiator: PublicKeyString,
722    pub error_message: Option<String>,
723    pub limit: String,
724    pub consumed: String,
725    pub cost: String,
726    // pub payment: Vec<Payment>,
727}
728
729#[derive(Debug, Deserialize, Clone, Default, Serialize)]
730#[wasm_bindgen(getter_with_clone)]
731pub struct Payment {
732    pub source: String,
733}
734
735/// Represents the result of an execution, either Success or Failure.
736#[derive(Debug, Deserialize, Clone, Default, Serialize)]
737#[wasm_bindgen(getter_with_clone)]
738pub struct ExecutionResult {
739    /// Optional Success information.
740    #[serde(rename = "Version2")]
741    #[wasm_bindgen(js_name = "Success")]
742    pub success: Option<Version2>,
743    /// Optional Failure information.
744    #[serde(rename = "Failure")]
745    #[wasm_bindgen(js_name = "Failure")]
746    pub failure: Option<Failure>,
747}
748
749#[derive(Debug, Clone, Serialize, Default)]
750#[wasm_bindgen(getter_with_clone)]
751pub struct HashString {
752    pub hash: String,
753}
754
755impl HashString {
756    fn from_hash(hash: String) -> Self {
757        HashString { hash }
758    }
759}
760
761impl<'de> Deserialize<'de> for HashString {
762    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
763    where
764        D: Deserializer<'de>,
765    {
766        let map: std::collections::HashMap<String, String> =
767            Deserialize::deserialize(deserializer)?;
768
769        if let Some(hash) = map.get("Version1").or_else(|| map.get("Deploy")) {
770            Ok(HashString::from_hash(hash.clone()))
771        } else {
772            Err(serde::de::Error::missing_field("Deploy or Version1"))
773        }
774    }
775}
776
777#[wasm_bindgen]
778impl HashString {
779    #[wasm_bindgen(getter, js_name = "Deploy")]
780    pub fn deploy(&self) -> String {
781        self.hash.clone()
782    }
783
784    #[wasm_bindgen(getter, js_name = "Version1")]
785    pub fn version1(&self) -> String {
786        self.hash.clone()
787    }
788
789    #[wasm_bindgen(js_name = "toString")]
790    pub fn to_string_js(&self) -> String {
791        self.to_string()
792    }
793}
794
795impl fmt::Display for HashString {
796    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
797        write!(f, "{}", self.hash)
798    }
799}
800
801#[derive(Debug, Deserialize, Clone, Serialize, Default)]
802#[wasm_bindgen(getter_with_clone)]
803pub struct PublicKeyString {
804    #[serde(rename = "PublicKey")]
805    #[wasm_bindgen(js_name = "PublicKey")]
806    pub public_key: String,
807}
808
809#[derive(Debug, Deserialize, Clone, Serialize, Default)]
810#[wasm_bindgen(getter_with_clone)]
811pub struct Message {
812    #[serde(rename = "String")]
813    #[wasm_bindgen(js_name = "String")]
814    pub string: String,
815}
816
817#[derive(Debug, Deserialize, Clone, Serialize, Default)]
818#[wasm_bindgen(getter_with_clone)]
819pub struct Messages {
820    pub entity_hash: String,
821    pub message: Message,
822    pub topic_name: String,
823    pub topic_name_hash: String,
824    pub topic_index: u32,
825    pub block_index: u64,
826}
827
828/// Represents processed deploy information.
829#[derive(Debug, Deserialize, Clone, Default, Serialize)]
830#[wasm_bindgen(getter_with_clone)]
831pub struct TransactionProcessed {
832    #[serde(alias = "transaction_hash")]
833    pub hash: HashString,
834    pub initiator_addr: PublicKeyString,
835    pub timestamp: String,
836    pub ttl: String,
837    pub block_hash: String,
838    /// Result of the execution, either Success or Failure.
839    pub execution_result: ExecutionResult,
840    pub messages: Vec<Messages>,
841}
842
843/// Represents the body of an event, containing processed deploy information.
844#[derive(Debug, Deserialize, Clone, Default, Serialize)]
845#[wasm_bindgen(getter_with_clone)]
846pub struct Body {
847    #[serde(rename = "TransactionProcessed")]
848    pub transaction_processed: Option<TransactionProcessed>,
849}
850
851// Implementing methods to get the field using different aliases
852#[wasm_bindgen]
853impl Body {
854    #[wasm_bindgen(getter, js_name = "get_deploy_processed")]
855    #[deprecated(note = "prefer 'get_transaction_processed'")]
856    #[allow(deprecated)]
857    pub fn get_deploy_processed(&self) -> Option<TransactionProcessed> {
858        self.transaction_processed.clone()
859    }
860
861    #[wasm_bindgen(getter, js_name = "get_transaction_processed")]
862    pub fn get_transaction_processed(&self) -> Option<TransactionProcessed> {
863        self.transaction_processed.clone()
864    }
865}
866
867/// Represents the result of parsing an event, containing error information and the event body.
868#[derive(Debug, Deserialize, Clone, Default, Serialize)]
869#[wasm_bindgen(getter_with_clone)]
870pub struct EventParseResult {
871    pub err: Option<String>,
872    pub body: Option<Body>,
873}
874
875/// Enum representing different event names.
876#[derive(Debug, Deserialize, Clone, Serialize)]
877enum EventName {
878    BlockAdded,
879    TransactionAccepted,
880    TransactionExpired,
881    TransactionProcessed,
882    Step,
883    FinalitySignature,
884    Fault,
885}
886
887impl fmt::Display for EventName {
888    /// Implements the `fmt::Display` trait for converting the enum variant to its string representation.
889    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
890        match self {
891            EventName::BlockAdded => write!(f, "BlockAdded"),
892            EventName::TransactionAccepted => write!(f, "TransactionAccepted"),
893            EventName::TransactionExpired => write!(f, "TransactionExpired"),
894            EventName::TransactionProcessed => write!(f, "TransactionProcessed"),
895            EventName::Step => write!(f, "Step"),
896            EventName::FinalitySignature => write!(f, "FinalitySignature"),
897            EventName::Fault => write!(f, "Fault"),
898        }
899    }
900}
901
902#[cfg(test)]
903mod tests {
904    use super::*;
905    use crate::watcher::{deploy_mock::DEPLOY_MOCK, transaction_mock::TRANSACTION_MOCK};
906    use sdk_tests::tests::helpers::get_network_constants;
907    use tokio;
908
909    #[test]
910    fn test_new() {
911        // Arrange
912        let (_, events_url, _, _, _) = get_network_constants();
913        let timeout_duration = 5000;
914
915        // Act
916        let watcher = Watcher::new(events_url.clone(), Some(timeout_duration));
917
918        // Assert
919        assert_eq!(watcher.events_url, events_url);
920        assert_eq!(watcher.subscriptions.len(), 0);
921        assert!(*watcher.active.borrow());
922        assert_eq!(
923            watcher.timeout_duration,
924            Duration::try_milliseconds(timeout_duration.try_into().unwrap()).unwrap()
925        );
926    }
927
928    #[test]
929    fn test_new_default_timeout() {
930        // Arrange
931        let (_, events_url, _, _, _) = get_network_constants();
932
933        // Act
934        let watcher = Watcher::new(events_url.clone(), None);
935
936        // Assert
937        assert_eq!(watcher.events_url, events_url);
938        assert_eq!(watcher.subscriptions.len(), 0);
939        assert!(*watcher.active.borrow());
940        assert_eq!(
941            watcher.timeout_duration,
942            Duration::try_milliseconds(DEFAULT_TIMEOUT_MS.try_into().unwrap()).unwrap()
943        );
944    }
945
946    #[tokio::test]
947    async fn test_extract_data_stream() {
948        // Arrange
949        let json_data = r#"data:segment1id:data:segment2id:data:segment3id:"#;
950
951        // Act
952        let result = Watcher::extract_data_stream(json_data);
953
954        // Assert
955        assert_eq!(result, vec!["segment1", "segment2", "segment3"]);
956    }
957
958    #[tokio::test]
959    #[allow(deprecated)]
960    async fn test_process_events_legacy() {
961        // Arrange
962        let (_, events_url, _, _, _) = get_network_constants();
963        let watcher = Watcher::new(events_url, None);
964        let deploy_hash = "19dbf9bdcd821e55392393c74c86deede02d9434d62d0bc72ab381ce7ea1c4f2";
965
966        let target_deploy_hash = Some(deploy_hash);
967
968        // Act
969        let result = watcher.process_events(DEPLOY_MOCK, target_deploy_hash);
970
971        // Assert
972        assert!(result.is_some());
973        let results = result.unwrap();
974        assert_eq!(results.len(), 1);
975
976        let event_parse_result = &results[0];
977        assert!(event_parse_result.err.is_none());
978
979        let body = event_parse_result.body.as_ref().unwrap();
980        let get_deploy_processed = body.get_deploy_processed().unwrap();
981        assert_eq!(get_deploy_processed.hash.to_string(), deploy_hash);
982    }
983
984    #[tokio::test]
985    async fn test_process_events() {
986        // Arrange
987        let (_, events_url, _, _, _) = get_network_constants();
988        let watcher = Watcher::new(events_url, None);
989        let transaction_hash = "8c6823d9480eee9fe0cfb5ed1fbf77f928cc6af21121298c05b4e3d87a328271";
990
991        let target_transaction_hash = Some(transaction_hash);
992
993        // Act
994        let result = watcher.process_events(TRANSACTION_MOCK, target_transaction_hash);
995
996        // Assert
997        assert!(result.is_some());
998        let results = result.unwrap();
999        assert_eq!(results.len(), 1);
1000
1001        let event_parse_result = &results[0];
1002        assert!(event_parse_result.err.is_none());
1003
1004        let body = event_parse_result.body.as_ref().unwrap();
1005        let transaction_processed = body.get_transaction_processed().unwrap();
1006        assert_eq!(transaction_processed.hash.to_string(), transaction_hash);
1007    }
1008
1009    #[tokio::test]
1010    async fn test_start_timeout() {
1011        // Arrange
1012        let (_, events_url, _, _, _) = get_network_constants();
1013        let watcher = Watcher::new(events_url, Some(1));
1014
1015        // Act
1016        let result = watcher.start().await;
1017
1018        // Assert
1019        assert!(result.is_some());
1020        let results = result.unwrap();
1021        assert_eq!(results.len(), 1);
1022        assert_eq!(results[0].err, Some("Timeout expired".to_string()));
1023        assert!(results[0].body.is_none());
1024    }
1025
1026    #[test]
1027    fn test_stop() {
1028        // Arrange
1029        let (_, events_url, _, _, _) = get_network_constants();
1030        let watcher = Watcher::new(events_url, None);
1031        assert!(*watcher.active.borrow());
1032
1033        // Act
1034        watcher.stop();
1035
1036        // Assert
1037        assert!(!(*watcher.active.borrow()));
1038    }
1039
1040    #[test]
1041    fn test_subscribe() {
1042        // Arrange
1043        let (_, events_url, _, _, _) = get_network_constants();
1044        let mut watcher = Watcher::new(events_url, None);
1045        let transaction_hash = "8c6823d9480eee9fe0cfb5ed1fbf77f928cc6af21121298c05b4e3d87a328271";
1046
1047        // Create a subscription
1048        let subscription =
1049            Subscription::new(transaction_hash.to_string(), EventHandlerFn::default());
1050
1051        // Act
1052        let result = watcher.subscribe(vec![subscription]);
1053
1054        // Assert
1055        assert!(result.is_ok());
1056
1057        // Try subscribing to the same deploy hash again
1058        let duplicate_subscription =
1059            Subscription::new(transaction_hash.to_string(), EventHandlerFn::default());
1060        let result_duplicate = watcher.subscribe(vec![duplicate_subscription]);
1061
1062        // Assert
1063        assert!(result_duplicate.is_err());
1064        assert_eq!(
1065            result_duplicate.err().unwrap(),
1066            "Already subscribed to this event"
1067        );
1068    }
1069
1070    #[test]
1071    fn test_unsubscribe() {
1072        // Arrange
1073        let (_, events_url, _, _, _) = get_network_constants();
1074        let mut watcher = Watcher::new(events_url, None);
1075        let transaction_hash = "8c6823d9480eee9fe0cfb5ed1fbf77f928cc6af21121298c05b4e3d87a328271";
1076
1077        // Subscribe to a transaction hash
1078        let transaction_hash_to_subscribe = transaction_hash.to_string();
1079        let subscription = Subscription::new(
1080            transaction_hash_to_subscribe.clone(),
1081            EventHandlerFn::default(),
1082        );
1083        let _ = watcher.subscribe(vec![subscription]);
1084
1085        // Assert that the deploy hash is initially subscribed
1086        assert!(watcher
1087            .subscriptions
1088            .iter()
1089            .any(|s| s.target_hash == transaction_hash_to_subscribe));
1090
1091        // Act
1092        watcher.unsubscribe(transaction_hash_to_subscribe.clone());
1093
1094        // Assert that the deploy hash is unsubscribed after calling unsubscribe
1095        assert!(!watcher
1096            .subscriptions
1097            .iter()
1098            .any(|s| s.target_hash == transaction_hash_to_subscribe));
1099    }
1100
1101    #[test]
1102    #[allow(deprecated)]
1103    fn test_sdk_watch_deploy_retunrs_instance() {
1104        // Arrange
1105        let sdk = SDK::new(None, None, None);
1106        let (_, events_url, _, _, _) = get_network_constants();
1107        let timeout_duration = 5000;
1108
1109        // Act
1110        let watcher = sdk.watch_deploy(&events_url, Some(timeout_duration));
1111
1112        // Assert
1113        assert_eq!(watcher.events_url, events_url);
1114        assert_eq!(watcher.subscriptions.len(), 0);
1115        assert!(*watcher.active.borrow());
1116        assert_eq!(
1117            watcher.timeout_duration,
1118            Duration::try_milliseconds(timeout_duration.try_into().unwrap()).unwrap()
1119        );
1120    }
1121
1122    #[test]
1123    fn test_sdk_watch_transaction_retunrs_instance() {
1124        // Arrange
1125        let sdk = SDK::new(None, None, None);
1126        let (_, events_url, _, _, _) = get_network_constants();
1127        let timeout_duration = 5000;
1128
1129        // Act
1130        let watcher = sdk.watch_transaction(&events_url, Some(timeout_duration));
1131
1132        // Assert
1133        assert_eq!(watcher.events_url, events_url);
1134        assert_eq!(watcher.subscriptions.len(), 0);
1135        assert!(*watcher.active.borrow());
1136        assert_eq!(
1137            watcher.timeout_duration,
1138            Duration::try_milliseconds(timeout_duration.try_into().unwrap()).unwrap()
1139        );
1140    }
1141
1142    #[tokio::test]
1143    #[allow(deprecated)]
1144    async fn test_wait_deploy_timeout() {
1145        // Arrange
1146        let sdk = SDK::new(None, None, None);
1147        let (_, events_url, _, _, _) = get_network_constants();
1148        let deploy_hash = "19dbf9bdcd821e55392393c74c86deede02d9434d62d0bc72ab381ce7ea1c4f2";
1149        let timeout_duration = Some(5000);
1150
1151        // Act
1152        let result = sdk
1153            .wait_deploy(&events_url, deploy_hash, timeout_duration)
1154            .await;
1155
1156        // Assert
1157        assert!(result.is_ok());
1158        let event_parse_result = result.unwrap();
1159        assert!(event_parse_result.err.is_some());
1160        assert_eq!(event_parse_result.err, Some("Timeout expired".to_string()));
1161    }
1162
1163    #[tokio::test]
1164    async fn test_wait_transaction_timeout() {
1165        // Arrange
1166        let sdk = SDK::new(None, None, None);
1167        let (_, events_url, _, _, _) = get_network_constants();
1168        let transaction_hash = "8c6823d9480eee9fe0cfb5ed1fbf77f928cc6af21121298c05b4e3d87a328271";
1169        let timeout_duration = Some(5000);
1170
1171        // Act
1172        let result = sdk
1173            .wait_transaction(&events_url, transaction_hash, timeout_duration)
1174            .await;
1175
1176        // Assert
1177        assert!(result.is_ok());
1178        let event_parse_result = result.unwrap();
1179        assert!(event_parse_result.err.is_some());
1180        assert_eq!(event_parse_result.err, Some("Timeout expired".to_string()));
1181    }
1182}