casper_rust_wasm_sdk/sdk/watcher/
watcher.rs

1use crate::SDK;
2use chrono::{Duration, Utc};
3use futures_util::StreamExt;
4#[cfg(target_arch = "wasm32")]
5use gloo_utils::format::JsValueSerdeExt;
6#[cfg(target_arch = "wasm32")]
7use js_sys::Promise;
8use serde::{Deserialize, Deserializer, Serialize};
9use serde_json::Value;
10use std::{
11    borrow::BorrowMut,
12    fmt,
13    sync::{Arc, Mutex},
14};
15use wasm_bindgen::prelude::*;
16#[cfg(target_arch = "wasm32")]
17use wasm_bindgen_futures::future_to_promise;
18
19const DEFAULT_TIMEOUT_MS: u64 = 60000;
20
21impl SDK {
22    /// Creates a new Watcher instance to watch deploys.
23    /// Legacy alias
24    ///
25    /// # Arguments
26    ///
27    /// * `events_url` - The URL to monitor for transaction events.
28    /// * `timeout_duration` - An optional timeout duration in seconds.
29    ///
30    /// # Returns
31    ///
32    /// A `Watcher` instance.
33    #[deprecated(note = "prefer 'watch_transaction'")]
34    pub fn watch_deploy(&self, events_url: &str, timeout_duration: Option<u64>) -> Watcher {
35        Watcher::new(events_url.to_string(), timeout_duration)
36    }
37
38    /// Creates a new Watcher instance to watch deploys.
39    ///
40    /// # Arguments
41    ///
42    /// * `events_url` - The URL to monitor for transaction events.
43    /// * `timeout_duration` - An optional timeout duration in seconds.
44    ///
45    /// # Returns
46    ///
47    /// A `Watcher` instance.
48    pub fn watch_transaction(&self, events_url: &str, timeout_duration: Option<u64>) -> Watcher {
49        Watcher::new(events_url.to_string(), timeout_duration)
50    }
51
52    /// Waits for a deploy event to be processed asynchronously.
53    /// Legacy alias
54    ///
55    /// # Arguments
56    ///
57    /// * `events_url` - The URL to monitor for transaction events.
58    /// * `deploy_hash` - The deploy hash to wait for.
59    /// * `timeout_duration` - An optional timeout duration in milliseconds.
60    ///
61    /// # Returns
62    ///
63    /// A `Result` containing either the processed `EventParseResult` or an error message.
64    #[deprecated(note = "prefer 'wait_transaction' with transaction")]
65    pub async fn wait_deploy(
66        &self,
67        events_url: &str,
68        deploy_hash: &str,
69        timeout_duration: Option<u64>,
70    ) -> Result<EventParseResult, String> {
71        Self::wait_transaction_internal(
72            events_url.to_string(),
73            deploy_hash.to_string(),
74            timeout_duration,
75        )
76        .await
77    }
78
79    /// Alias for wait_deploy Waits for a deploy event to be processed asynchronously.
80    ///
81    /// # Arguments
82    ///
83    /// * `events_url` - The URL to monitor for transaction events.
84    /// * `target_hash` - The transaction hash to wait for.
85    /// * `timeout_duration` - An optional timeout duration in milliseconds.
86    ///
87    /// # Returns
88    ///
89    /// A `Result` containing either the processed `EventParseResult` or an error message
90    pub async fn wait_transaction(
91        &self,
92        events_url: &str,
93        target_hash: &str,
94        timeout_duration: Option<u64>,
95    ) -> Result<EventParseResult, String> {
96        Self::wait_transaction_internal(
97            events_url.to_string(),
98            target_hash.to_string(),
99            timeout_duration,
100        )
101        .await
102    }
103
104    /// Internal function to wait for a deploy event.
105    ///
106    /// # Arguments
107    ///
108    /// * `events_url` - The URL to monitor for transaction events.
109    /// * `target_hash` - The transaction hash to wait for.
110    /// * `timeout_duration` - An optional timeout duration in milliseconds.
111    ///
112    /// # Returns
113    ///
114    /// A `Result` containing either the processed `EventParseResult` or an error message.
115    async fn wait_transaction_internal(
116        events_url: String,
117        target_hash: String,
118        timeout_duration: Option<u64>,
119    ) -> Result<EventParseResult, String> {
120        let watcher = Watcher::new(events_url, timeout_duration);
121        let result = watcher.start_internal(Some(target_hash)).await;
122        match result {
123            Some(event_parse_results) => {
124                if let Some(event_parse_result) = event_parse_results.first() {
125                    return Ok(event_parse_result.clone());
126                }
127                Err("No first event result".to_string())
128            }
129            None => Err("No event result found".to_string()),
130        }
131    }
132}
133
134#[wasm_bindgen]
135impl SDK {
136    /// Creates a new Watcher instance to watch deploys (JavaScript-friendly).
137    /// Legacy alias
138    ///
139    /// # Arguments
140    ///
141    /// * `events_url` - The URL to monitor for transaction events.
142    /// * `timeout_duration` - An optional timeout duration in seconds.
143    ///
144    /// # Returns
145    ///
146    /// A `Watcher` instance.
147    #[cfg(target_arch = "wasm32")]
148    #[wasm_bindgen(js_name = "watchDeploy")]
149    #[deprecated(note = "prefer 'watchTransaction'")]
150    #[allow(deprecated)]
151    pub fn watch_deploy_js_alias(
152        &self,
153        events_url: &str,
154        timeout_duration: Option<u32>,
155    ) -> Watcher {
156        self.watch_deploy(events_url, timeout_duration.map(Into::into))
157    }
158
159    /// Creates a new Watcher instance to watch deploys (JavaScript-friendly).
160    ///
161    /// # Arguments
162    ///
163    /// * `events_url` - The URL to monitor for transaction events.
164    /// * `timeout_duration` - An optional timeout duration in seconds.
165    ///
166    /// # Returns
167    ///
168    /// A `Watcher` instance.
169    #[cfg(target_arch = "wasm32")]
170    #[wasm_bindgen(js_name = "watchTransaction")]
171    pub fn watch_transaction_js_alias(
172        &self,
173        events_url: &str,
174        timeout_duration: Option<u32>,
175    ) -> Watcher {
176        self.watch_transaction(events_url, timeout_duration.map(Into::into))
177    }
178
179    /// Waits for a deploy event to be processed asynchronously (JavaScript-friendly).
180    /// Legacy alias
181    ///
182    /// # Arguments
183    ///
184    /// * `events_url` - The URL to monitor for transaction events.
185    /// * `deploy_hash` - The deploy hash to wait for.
186    /// * `timeout_duration` - An optional timeout duration in seconds.
187    ///
188    /// # Returns
189    ///
190    /// A JavaScript `Promise` resolving to either the processed `EventParseResult` or an error message.
191    #[cfg(target_arch = "wasm32")]
192    #[wasm_bindgen(js_name = "waitDeploy")]
193    #[deprecated(note = "prefer 'waitTransaction' with transaction")]
194    #[allow(deprecated)]
195    pub async fn wait_deploy_js_alias(
196        &self,
197        events_url: &str,
198        deploy_hash: &str,
199        timeout_duration: Option<u32>,
200    ) -> Promise {
201        self.wait_transaction_js_alias(events_url, deploy_hash, timeout_duration)
202            .await
203    }
204
205    /// Waits for a deploy event to be processed asynchronously (JavaScript-friendly).
206    ///
207    /// # Arguments
208    ///
209    /// * `events_url` - The URL to monitor for transaction events.
210    /// * `target_hash` - The transaction hash to wait for.
211    /// * `timeout_duration` - An optional timeout duration in seconds.
212    ///
213    /// # Returns
214    ///
215    /// A JavaScript `Promise` resolving to either the processed `EventParseResult` or an error message.
216    #[cfg(target_arch = "wasm32")]
217    #[wasm_bindgen(js_name = "waitTransaction")]
218    pub async fn wait_transaction_js_alias(
219        &self,
220        events_url: &str,
221        target_hash: &str,
222        timeout_duration: Option<u32>,
223    ) -> Promise {
224        let events_url = events_url.to_string();
225        let target_hash = target_hash.to_string();
226        let future = async move {
227            let result = Self::wait_transaction_internal(
228                events_url,
229                target_hash,
230                timeout_duration.map(Into::into),
231            )
232            .await;
233            match result {
234                Ok(event_parse_result) => JsValue::from_serde(&event_parse_result)
235                    .map_err(|err| JsValue::from_str(&format!("{err}"))),
236                Err(err) => Err(JsValue::from_str(&err)),
237            }
238        };
239
240        future_to_promise(future)
241    }
242}
243
244/// Represents a deploy watcher responsible for monitoring transaction events.
245///
246/// This struct allows clients to subscribe to transaction events, start watching for events,
247/// or wait for an event and handle the received deploy event data.
248///
249/// # Fields
250///
251/// * `events_url` - The URL for transaction events.
252/// * `subscriptions` - Vector containing deploy subscriptions.
253/// * `active` - Reference-counted cell indicating whether the deploy watcher is active.
254/// * `timeout_duration` - Duration representing the optional timeout for watching events.
255#[derive(Clone)]
256#[wasm_bindgen]
257pub struct Watcher {
258    events_url: String,
259    subscriptions: Vec<Subscription>,
260    active: Arc<Mutex<bool>>,
261    timeout_duration: Duration,
262}
263
264#[wasm_bindgen]
265impl Watcher {
266    /// Creates a new `Watcher` instance.
267    ///
268    /// # Arguments
269    ///
270    /// * `events_url` - The URL for transaction events.
271    /// * `timeout_duration` - Optional duration in milliseconds for watching events. If not provided,
272    ///   a default timeout of 60,000 milliseconds (1 minute) is used.
273    ///
274    /// # Returns
275    ///
276    /// A new `Watcher` instance.
277    #[wasm_bindgen(constructor)]
278    pub fn new(events_url: String, timeout_duration: Option<u64>) -> Self {
279        let timeout_duration = Duration::try_milliseconds(
280            timeout_duration
281                .unwrap_or(DEFAULT_TIMEOUT_MS)
282                .try_into()
283                .unwrap(),
284        )
285        .unwrap_or_default();
286
287        Watcher {
288            events_url,
289            subscriptions: Vec::new(),
290            active: Arc::new(Mutex::new(true)),
291            timeout_duration,
292        }
293    }
294
295    /// Subscribes to transaction events.
296    ///
297    /// # Arguments
298    ///
299    /// * `subscriptions` - Vector of deploy subscriptions to be added.
300    ///
301    /// # Returns
302    ///
303    /// Result indicating success or an error message.
304    #[cfg(target_arch = "wasm32")]
305    #[wasm_bindgen(js_name = "subscribe")]
306    pub fn subscribe_js_alias(&mut self, subscriptions: Vec<Subscription>) -> Result<(), String> {
307        self.subscribe(subscriptions)
308    }
309
310    /// Unsubscribes from transaction events based on the provided transaction hash.
311    ///
312    /// # Arguments
313    ///
314    /// * `transaction_hash` - The transaction hash to unsubscribe.
315    ///
316    /// This method removes the deploy subscription associated with the provided transaction hash.
317    #[wasm_bindgen]
318    pub fn unsubscribe(&mut self, target_hash: String) {
319        self.subscriptions.retain(|s| s.target_hash != target_hash);
320    }
321
322    /// Starts watching for transaction events (JavaScript-friendly).
323    ///
324    /// # Returns
325    ///
326    /// Result containing the serialized transaction events data or an error message.
327    #[cfg(target_arch = "wasm32")]
328    #[wasm_bindgen(js_name = "start")]
329    pub async fn start_js_alias(&self) -> Result<JsValue, JsError> {
330        let result = match self.start_internal(None).await {
331            Some(res) => res,
332            None => return Ok(JsValue::NULL),
333        };
334
335        let serialized = JsValue::from_serde(&result)
336            .map_err(|err| JsError::new(&format!("Error serializing events: {err:?}")))?;
337
338        Ok(serialized)
339    }
340
341    /// Stops watching for transaction events.
342    ///
343    /// This method sets the deploy watcher as inactive and stops the event listener if it exists.
344    #[wasm_bindgen]
345    pub fn stop(&self) {
346        {
347            let mut active = self.active.lock().unwrap();
348            *active = false;
349        }
350    }
351}
352
353impl Watcher {
354    /// Asynchronously starts watching for transaction events and execute callback handler functions from deploy subscriptions
355    ///
356    /// # Returns
357    ///
358    /// An `Option` containing the serialized deploy event data or `None` if no events are received.
359    pub async fn start(&self) -> Option<Vec<EventParseResult>> {
360        self.start_internal(None).await
361    }
362
363    /// Asynchronously starts watching for transaction events
364    ///
365    /// # Arguments
366    ///
367    /// * `transaction_hash` - Optional transaction hash to directly return processed event. If provided, it directly returns matched events without executing callback handler functions from deploy subscriptions. If `None`, it executes callback handler functions from deploy subscriptions.
368    ///
369    /// # Returns
370    ///
371    /// An `Option` containing the serialized deploy event data or `None` if no events are received.
372    async fn start_internal(&self, target_hash: Option<String>) -> Option<Vec<EventParseResult>> {
373        {
374            let mut active = self.active.lock().unwrap();
375            *active = true;
376        }
377
378        let client = reqwest::Client::new();
379        let url = self.events_url.clone();
380
381        let watcher = Arc::new(Mutex::new(self.clone()));
382
383        let start_time = Utc::now();
384        let timeout_duration = self.timeout_duration;
385
386        let response = match client.get(&url).send().await {
387            Ok(res) => res,
388            Err(err) => {
389                let err = err.to_string();
390                let event_parse_result = EventParseResult {
391                    err: Some(err.to_string()),
392                    body: None,
393                };
394                return Some([event_parse_result].to_vec());
395            }
396        };
397
398        if response.status().is_success() {
399            let buffer_size = 1;
400            let mut buffer = Vec::with_capacity(buffer_size);
401
402            let mut bytes_stream = response.bytes_stream();
403            while let Some(chunk) = bytes_stream.next().await {
404                match chunk {
405                    Ok(bytes) => {
406                        let this_clone = Arc::clone(&watcher);
407                        if !*this_clone.lock().unwrap().active.lock().unwrap() {
408                            return None;
409                        }
410
411                        if Utc::now() - start_time >= timeout_duration {
412                            let event_parse_result = EventParseResult {
413                                err: Some("Timeout expired".to_string()),
414                                body: None,
415                            };
416                            return Some([event_parse_result].to_vec());
417                        }
418
419                        buffer.extend_from_slice(&bytes);
420
421                        while let Some(index) = buffer.iter().position(|&b| b == b'\n') {
422                            let message = buffer.drain(..=index).collect::<Vec<_>>();
423
424                            if let Ok(message) = std::str::from_utf8(&message) {
425                                let watcher_guard = this_clone.lock().unwrap();
426                                let result = watcher_guard
427                                    .clone()
428                                    .process_events(message, target_hash.as_deref());
429                                match result {
430                                    Some(event_parse_result) => return Some(event_parse_result),
431                                    None => {
432                                        continue;
433                                    }
434                                };
435                            } else {
436                                let event_parse_result = EventParseResult {
437                                    err: Some("Error decoding UTF-8 data".to_string()),
438                                    body: None,
439                                };
440                                return Some([event_parse_result].to_vec());
441                            }
442                        }
443                    }
444                    Err(err) => {
445                        let event_parse_result = EventParseResult {
446                            err: Some(format!("Error reading chunk: {err}")),
447                            body: None,
448                        };
449                        return Some([event_parse_result].to_vec());
450                    }
451                }
452            }
453        } else {
454            let event_parse_result = EventParseResult {
455                err: Some("Failed to fetch stream".to_string()),
456                body: None,
457            };
458            return Some([event_parse_result].to_vec());
459        }
460        None
461    }
462
463    /// Subscribes to transaction events.
464    ///
465    /// # Arguments
466    ///
467    /// * `subscriptions` - Vector of subscriptions to be added.
468    ///
469    /// # Returns
470    ///
471    /// Result indicating success or an error message.
472    pub fn subscribe(&mut self, subscriptions: Vec<Subscription>) -> Result<(), String> {
473        for new_subscription in &subscriptions {
474            if self
475                .subscriptions
476                .iter()
477                .any(|s| s.target_hash == new_subscription.target_hash)
478            {
479                return Err(String::from("Already subscribed to this event"));
480            }
481        }
482        self.subscriptions.extend(subscriptions);
483        Ok(())
484    }
485
486    /// Processes events received from the stream and notifies subscribers.
487    ///
488    /// # Arguments
489    ///
490    /// * `message` - The raw message received from the event stream.
491    /// * `target_transaction_hash` - Optional transaction hash to directly return. If provided, it directly returns matched events without executing callback handler functions from subscriptions. If `None`, it executes callback handler functions from subscriptions.
492    ///
493    /// # Returns
494    ///
495    /// An `Option` containing the serialized transaction/deploy event data or `None` if an error occurs.
496    fn process_events(
497        mut self,
498        message: &str,
499        target_hash: Option<&str>,
500    ) -> Option<Vec<EventParseResult>> {
501        let data_stream = Self::extract_data_stream(message);
502
503        for data_item in data_stream {
504            let trimmed_item = data_item.trim();
505            let transaction_processed_str = EventName::TransactionProcessed.to_string();
506
507            if !trimmed_item.contains(&transaction_processed_str) {
508                continue;
509            }
510
511            if let Ok(parsed_json) = serde_json::from_str::<Value>(trimmed_item) {
512                let transaction = parsed_json.get(transaction_processed_str);
513                if let Some(transaction_processed) =
514                    transaction.and_then(|transaction| transaction.as_object())
515                {
516                    if let Some(transaction_hash_processed) = transaction_processed
517                        .get("transaction_hash")
518                        .and_then(|transaction_hash| {
519                            transaction_hash
520                                .get("Version1")
521                                .or_else(|| transaction_hash.get("Deploy"))
522                                .and_then(|transaction_hash| transaction_hash.as_str())
523                        })
524                    {
525                        let mut transaction_hash_found =
526                            target_hash == Some(transaction_hash_processed);
527
528                        let transaction_processed: Option<TransactionProcessed> =
529                            serde_json::from_value(transaction.unwrap().clone()).ok();
530
531                        let body = Some(Body {
532                            transaction_processed,
533                        });
534
535                        let event_parse_result = EventParseResult { err: None, body };
536
537                        if transaction_hash_found {
538                            self.unsubscribe(target_hash.unwrap().to_string());
539                            self.stop();
540                            return Some([event_parse_result].to_vec());
541                        }
542
543                        let mut results: Vec<EventParseResult> = [].to_vec();
544                        for subscription in self.subscriptions.clone().iter() {
545                            if transaction_hash_processed == subscription.target_hash {
546                                let event_handler = &subscription.event_handler_fn;
547
548                                #[cfg(not(target_arch = "wasm32"))]
549                                {
550                                    event_handler.call(event_parse_result.clone());
551                                }
552                                #[cfg(target_arch = "wasm32")]
553                                {
554                                    let this = JsValue::null();
555                                    let args = js_sys::Array::new();
556                                    args.push(
557                                        &JsValue::from_serde(&event_parse_result.clone()).unwrap(),
558                                    );
559                                    event_handler.apply(&this, &args).unwrap();
560                                }
561
562                                self.unsubscribe(transaction_hash_processed.to_string());
563                                transaction_hash_found = true;
564                                results.push(event_parse_result.clone())
565                            }
566                        }
567
568                        if transaction_hash_found && self.subscriptions.is_empty() {
569                            self.stop();
570                            return Some(results);
571                        }
572                    }
573                }
574            } else {
575                let event_parse_result = EventParseResult {
576                    err: Some("Failed to parse JSON data.".to_string()),
577                    body: None,
578                };
579                return Some([event_parse_result].to_vec());
580            }
581        }
582        None
583    }
584
585    /// Extracts the data stream from the raw JSON data.
586    ///
587    /// # Arguments
588    ///
589    /// * `json_data` - The raw JSON data containing the data stream.
590    ///
591    /// # Returns
592    ///
593    /// A vector of data items within the data stream.
594    fn extract_data_stream(json_data: &str) -> Vec<&str> {
595        let data_stream: Vec<&str> = json_data
596            .split("data:")
597            .filter(|s| !s.is_empty())
598            .map(|s| s.split("id:").next().unwrap_or(""))
599            .collect();
600        data_stream
601    }
602}
603
604/// A wrapper for an event handler function, providing synchronization and cloning capabilities.
605pub struct EventHandlerFn(Arc<Mutex<dyn Fn(EventParseResult) + Send + Sync>>);
606
607#[allow(dead_code)]
608impl EventHandlerFn {
609    /// Creates a new `EventHandlerFn` with the specified event handling function.
610    ///
611    /// # Arguments
612    ///
613    /// * `func` - A function that takes an `EventParseResult` as an argument.
614    ///
615    /// # Returns
616    ///
617    /// A new `EventHandlerFn` instance.
618    pub fn new<F>(func: F) -> Self
619    where
620        F: Fn(EventParseResult) + Send + Sync + 'static,
621    {
622        EventHandlerFn(Arc::new(Mutex::new(func)))
623    }
624
625    /// Calls the stored event handling function with the provided `EventParseResult`.
626    ///
627    /// # Arguments
628    ///
629    /// * `event_result` - The result of an event to be passed to the stored event handling function.
630    pub fn call(&self, event_result: EventParseResult) {
631        let func = self.0.lock().unwrap();
632        (*func)(event_result); // Call the stored function with arguments
633    }
634}
635
636impl fmt::Debug for EventHandlerFn {
637    /// Implements the `Debug` trait for better debugging support.
638    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
639        write!(f, "EventHandlerFn")
640    }
641}
642
643impl Clone for EventHandlerFn {
644    /// Implements the `Clone` trait for creating a cloned instance with shared underlying data.
645    fn clone(&self) -> Self {
646        EventHandlerFn(self.0.clone())
647    }
648}
649
650impl Default for EventHandlerFn {
651    /// Implements the `Default` trait, creating a default instance with a no-op event handling function.
652    fn default() -> Self {
653        EventHandlerFn(Arc::new(Mutex::new(|_event_result| {})))
654    }
655}
656
657// Define Subscription struct with different configurations based on the target architecture.
658#[cfg(not(target_arch = "wasm32"))]
659/// Represents a subscription to transaction events for non-wasm32 target architecture.
660#[derive(Debug, Clone, Default)]
661pub struct Subscription {
662    /// Transaction target hash to identify the subscription.
663    pub target_hash: String,
664    /// Handler function for transaction events.
665    pub event_handler_fn: EventHandlerFn,
666}
667
668#[cfg(target_arch = "wasm32")]
669/// Represents a subscription to transaction events for wasm32 target architecture.
670#[derive(Debug, Clone, Default)]
671#[wasm_bindgen(getter_with_clone)]
672pub struct Subscription {
673    /// Transaction target hash to identify the subscription.
674    #[wasm_bindgen(js_name = "targetHash")]
675    pub target_hash: String,
676    /// Handler function for transaction events.
677    #[wasm_bindgen(js_name = "eventHandlerFn")]
678    pub event_handler_fn: js_sys::Function,
679}
680
681impl Subscription {
682    /// Constructor for Subscription for non-wasm32 target architecture.
683    ///
684    /// # Arguments
685    ///
686    /// * `target_hash` - Transaction target hash to identify the subscription.
687    /// * `event_handler_fn` - Handler function for transaction events.
688    #[cfg(not(target_arch = "wasm32"))]
689    pub fn new(target_hash: String, event_handler_fn: EventHandlerFn) -> Self {
690        Self {
691            target_hash,
692            event_handler_fn,
693        }
694    }
695}
696
697#[wasm_bindgen]
698impl Subscription {
699    /// Constructor for Subscription for wasm32 target architecture.
700    ///
701    /// # Arguments
702    ///
703    /// * `transaction_hash` - Transaction hash to identify the subscription.
704    /// * `event_handler_fn` - Handler function for transaction events.
705    #[cfg(target_arch = "wasm32")]
706    #[wasm_bindgen(constructor)]
707    pub fn new(target_hash: String, event_handler_fn: js_sys::Function) -> Self {
708        Self {
709            target_hash,
710            event_handler_fn,
711        }
712    }
713}
714
715/// Represents a failure response containing an error message.
716#[derive(Debug, Deserialize, Clone, Default, Serialize)]
717#[wasm_bindgen(getter_with_clone)]
718pub struct Failure {
719    pub cost: String,
720    pub error_message: String,
721}
722
723/// Represents a success response containing a cost value.
724#[derive(Debug, Deserialize, Clone, Default, Serialize)]
725#[wasm_bindgen(getter_with_clone)]
726pub struct Version2 {
727    pub initiator: PublicKeyString,
728    pub error_message: Option<String>,
729    pub limit: String,
730    pub consumed: String,
731    pub cost: String,
732    // pub payment: Vec<Payment>,
733}
734
735#[derive(Debug, Deserialize, Clone, Default, Serialize)]
736#[wasm_bindgen(getter_with_clone)]
737pub struct Payment {
738    pub source: String,
739}
740
741/// Represents the result of an execution, either Success or Failure.
742#[derive(Debug, Deserialize, Clone, Default, Serialize)]
743#[wasm_bindgen(getter_with_clone)]
744pub struct ExecutionResult {
745    /// Optional Success information.
746    #[serde(rename = "Version2")]
747    #[wasm_bindgen(js_name = "Success")]
748    pub success: Option<Version2>,
749    /// Optional Failure information.
750    #[serde(rename = "Failure")]
751    #[wasm_bindgen(js_name = "Failure")]
752    pub failure: Option<Failure>,
753}
754
755#[derive(Debug, Clone, Serialize, Default)]
756#[wasm_bindgen(getter_with_clone)]
757pub struct HashString {
758    pub hash: String,
759}
760
761impl HashString {
762    fn from_hash(hash: String) -> Self {
763        HashString { hash }
764    }
765}
766
767impl<'de> Deserialize<'de> for HashString {
768    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
769    where
770        D: Deserializer<'de>,
771    {
772        let map: std::collections::HashMap<String, String> =
773            Deserialize::deserialize(deserializer)?;
774
775        if let Some(hash) = map.get("Version1").or_else(|| map.get("Deploy")) {
776            Ok(HashString::from_hash(hash.clone()))
777        } else {
778            Err(serde::de::Error::missing_field("Deploy or Version1"))
779        }
780    }
781}
782
783#[wasm_bindgen]
784impl HashString {
785    #[wasm_bindgen(getter, js_name = "Deploy")]
786    pub fn deploy(&self) -> String {
787        self.hash.clone()
788    }
789
790    #[wasm_bindgen(getter, js_name = "Version1")]
791    pub fn version1(&self) -> String {
792        self.hash.clone()
793    }
794
795    #[wasm_bindgen(js_name = "toString")]
796    pub fn to_string_js(&self) -> String {
797        self.to_string()
798    }
799}
800
801impl fmt::Display for HashString {
802    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
803        write!(f, "{}", self.hash)
804    }
805}
806
807#[derive(Debug, Deserialize, Clone, Serialize, Default)]
808#[wasm_bindgen(getter_with_clone)]
809pub struct PublicKeyString {
810    #[serde(rename = "PublicKey")]
811    #[wasm_bindgen(js_name = "PublicKey")]
812    pub public_key: String,
813}
814
815#[derive(Debug, Deserialize, Clone, Serialize, Default)]
816#[wasm_bindgen(getter_with_clone)]
817pub struct Message {
818    #[serde(rename = "String")]
819    #[wasm_bindgen(js_name = "String")]
820    pub string: String,
821}
822
823#[derive(Debug, Deserialize, Clone, Serialize, Default)]
824#[wasm_bindgen(getter_with_clone)]
825pub struct Messages {
826    pub entity_hash: String,
827    pub message: Message,
828    pub topic_name: String,
829    pub topic_name_hash: String,
830    pub topic_index: u32,
831    pub block_index: u64,
832}
833
834/// Represents processed deploy information.
835#[derive(Debug, Deserialize, Clone, Default, Serialize)]
836#[wasm_bindgen(getter_with_clone)]
837pub struct TransactionProcessed {
838    #[serde(alias = "transaction_hash")]
839    pub hash: HashString,
840    pub initiator_addr: PublicKeyString,
841    pub timestamp: String,
842    pub ttl: String,
843    pub block_hash: String,
844    /// Result of the execution, either Success or Failure.
845    pub execution_result: ExecutionResult,
846    pub messages: Vec<Messages>,
847}
848
849/// Represents the body of an event, containing processed deploy information.
850#[derive(Debug, Deserialize, Clone, Default, Serialize)]
851#[wasm_bindgen(getter_with_clone)]
852pub struct Body {
853    #[serde(rename = "TransactionProcessed")]
854    pub transaction_processed: Option<TransactionProcessed>,
855}
856
857// Implementing methods to get the field using different aliases
858#[wasm_bindgen]
859impl Body {
860    #[wasm_bindgen(getter, js_name = "get_deploy_processed")]
861    #[deprecated(note = "prefer 'get_transaction_processed'")]
862    #[allow(deprecated)]
863    pub fn get_deploy_processed(&self) -> Option<TransactionProcessed> {
864        self.transaction_processed.clone()
865    }
866
867    #[wasm_bindgen(getter, js_name = "get_transaction_processed")]
868    pub fn get_transaction_processed(&self) -> Option<TransactionProcessed> {
869        self.transaction_processed.clone()
870    }
871}
872
873/// Represents the result of parsing an event, containing error information and the event body.
874#[derive(Debug, Deserialize, Clone, Default, Serialize)]
875#[wasm_bindgen(getter_with_clone)]
876pub struct EventParseResult {
877    pub err: Option<String>,
878    pub body: Option<Body>,
879}
880
881/// Enum representing different event names.
882#[derive(Debug, Deserialize, Clone, Serialize)]
883enum EventName {
884    BlockAdded,
885    TransactionAccepted,
886    TransactionExpired,
887    TransactionProcessed,
888    Step,
889    FinalitySignature,
890    Fault,
891}
892
893impl fmt::Display for EventName {
894    /// Implements the `fmt::Display` trait for converting the enum variant to its string representation.
895    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
896        match self {
897            EventName::BlockAdded => write!(f, "BlockAdded"),
898            EventName::TransactionAccepted => write!(f, "TransactionAccepted"),
899            EventName::TransactionExpired => write!(f, "TransactionExpired"),
900            EventName::TransactionProcessed => write!(f, "TransactionProcessed"),
901            EventName::Step => write!(f, "Step"),
902            EventName::FinalitySignature => write!(f, "FinalitySignature"),
903            EventName::Fault => write!(f, "Fault"),
904        }
905    }
906}
907
908#[cfg(test)]
909mod tests {
910    use std::borrow::Borrow;
911
912    use super::*;
913    use crate::watcher::{deploy_mock::DEPLOY_MOCK, transaction_mock::TRANSACTION_MOCK};
914    use sdk_tests::tests::helpers::get_network_constants;
915    use tokio;
916
917    #[test]
918    fn test_new() {
919        // Arrange
920        let (_, events_url, _, _, _) = get_network_constants();
921        let timeout_duration = 5000;
922
923        // Act
924        let watcher = Watcher::new(events_url.clone(), Some(timeout_duration));
925
926        // Assert
927        assert_eq!(watcher.events_url, events_url);
928        assert_eq!(watcher.subscriptions.len(), 0);
929        assert!(*watcher.active.lock().unwrap());
930        assert_eq!(
931            watcher.timeout_duration,
932            Duration::try_milliseconds(timeout_duration.try_into().unwrap()).unwrap()
933        );
934    }
935
936    #[test]
937    fn test_new_default_timeout() {
938        // Arrange
939        let (_, events_url, _, _, _) = get_network_constants();
940
941        // Act
942        let watcher = Watcher::new(events_url.clone(), None);
943
944        // Assert
945        assert_eq!(watcher.events_url, events_url);
946        assert_eq!(watcher.subscriptions.len(), 0);
947        assert!(*watcher.active.lock().unwrap());
948        assert_eq!(
949            watcher.timeout_duration,
950            Duration::try_milliseconds(DEFAULT_TIMEOUT_MS.try_into().unwrap()).unwrap()
951        );
952    }
953
954    #[tokio::test]
955    async fn test_extract_data_stream() {
956        // Arrange
957        let json_data = r#"data:segment1id:data:segment2id:data:segment3id:"#;
958
959        // Act
960        let result = Watcher::extract_data_stream(json_data);
961
962        // Assert
963        assert_eq!(result, vec!["segment1", "segment2", "segment3"]);
964    }
965
966    #[tokio::test]
967    #[allow(deprecated)]
968    async fn test_process_events_legacy() {
969        // Arrange
970        let (_, events_url, _, _, _) = get_network_constants();
971        let watcher = Watcher::new(events_url, None);
972        let deploy_hash = "19dbf9bdcd821e55392393c74c86deede02d9434d62d0bc72ab381ce7ea1c4f2";
973
974        let target_deploy_hash = Some(deploy_hash);
975
976        // Act
977        let result = watcher.process_events(DEPLOY_MOCK, target_deploy_hash);
978
979        // Assert
980        assert!(result.is_some());
981        let results = result.unwrap();
982        assert_eq!(results.len(), 1);
983
984        let event_parse_result = &results[0];
985        assert!(event_parse_result.err.is_none());
986
987        let body = event_parse_result.body.as_ref().unwrap();
988        let get_deploy_processed = body.get_deploy_processed().unwrap();
989        assert_eq!(get_deploy_processed.hash.to_string(), deploy_hash);
990    }
991
992    #[tokio::test]
993    async fn test_process_events() {
994        // Arrange
995        let (_, events_url, _, _, _) = get_network_constants();
996        let watcher = Watcher::new(events_url, None);
997        let transaction_hash = "8c6823d9480eee9fe0cfb5ed1fbf77f928cc6af21121298c05b4e3d87a328271";
998
999        let target_transaction_hash = Some(transaction_hash);
1000
1001        // Act
1002        let result = watcher.process_events(TRANSACTION_MOCK, target_transaction_hash);
1003
1004        // Assert
1005        assert!(result.is_some());
1006        let results = result.unwrap();
1007        assert_eq!(results.len(), 1);
1008
1009        let event_parse_result = &results[0];
1010        assert!(event_parse_result.err.is_none());
1011
1012        let body = event_parse_result.body.as_ref().unwrap();
1013        let transaction_processed = body.get_transaction_processed().unwrap();
1014        assert_eq!(transaction_processed.hash.to_string(), transaction_hash);
1015    }
1016
1017    #[tokio::test]
1018    async fn test_start_timeout() {
1019        // Arrange
1020        let (_, events_url, _, _, _) = get_network_constants();
1021        let watcher = Watcher::new(events_url, Some(1));
1022
1023        // Act
1024        let result = watcher.start().await;
1025
1026        // Assert
1027        assert!(result.is_some());
1028        let results = result.unwrap();
1029        assert_eq!(results.len(), 1);
1030        assert_eq!(results[0].err, Some("Timeout expired".to_string()));
1031        assert!(results[0].body.is_none());
1032    }
1033
1034    #[test]
1035    fn test_stop() {
1036        // Arrange
1037        let (_, events_url, _, _, _) = get_network_constants();
1038        let watcher = Watcher::new(events_url, None);
1039        assert!(*watcher.active.lock().unwrap());
1040
1041        // Act
1042        watcher.stop();
1043
1044        // Assert
1045        assert!(!(*watcher.active.lock().unwrap()));
1046    }
1047
1048    #[test]
1049    fn test_subscribe() {
1050        // Arrange
1051        let (_, events_url, _, _, _) = get_network_constants();
1052        let mut watcher = Watcher::new(events_url, None);
1053        let transaction_hash = "8c6823d9480eee9fe0cfb5ed1fbf77f928cc6af21121298c05b4e3d87a328271";
1054
1055        // Create a subscription
1056        let subscription =
1057            Subscription::new(transaction_hash.to_string(), EventHandlerFn::default());
1058
1059        // Act
1060        let result = watcher.subscribe(vec![subscription]);
1061
1062        // Assert
1063        assert!(result.is_ok());
1064
1065        // Try subscribing to the same deploy hash again
1066        let duplicate_subscription =
1067            Subscription::new(transaction_hash.to_string(), EventHandlerFn::default());
1068        let result_duplicate = watcher.subscribe(vec![duplicate_subscription]);
1069
1070        // Assert
1071        assert!(result_duplicate.is_err());
1072        assert_eq!(
1073            result_duplicate.err().unwrap(),
1074            "Already subscribed to this event"
1075        );
1076    }
1077
1078    #[test]
1079    fn test_unsubscribe() {
1080        // Arrange
1081        let (_, events_url, _, _, _) = get_network_constants();
1082        let mut watcher = Watcher::new(events_url, None);
1083        let transaction_hash = "8c6823d9480eee9fe0cfb5ed1fbf77f928cc6af21121298c05b4e3d87a328271";
1084
1085        // Subscribe to a transaction hash
1086        let transaction_hash_to_subscribe = transaction_hash.to_string();
1087        let subscription = Subscription::new(
1088            transaction_hash_to_subscribe.clone(),
1089            EventHandlerFn::default(),
1090        );
1091        let _ = watcher.subscribe(vec![subscription]);
1092
1093        // Assert that the deploy hash is initially subscribed
1094        assert!(watcher
1095            .subscriptions
1096            .iter()
1097            .any(|s| s.target_hash == transaction_hash_to_subscribe));
1098
1099        // Act
1100        watcher.unsubscribe(transaction_hash_to_subscribe.clone());
1101
1102        // Assert that the deploy hash is unsubscribed after calling unsubscribe
1103        assert!(!watcher
1104            .subscriptions
1105            .iter()
1106            .any(|s| s.target_hash == transaction_hash_to_subscribe));
1107    }
1108
1109    #[test]
1110    #[allow(deprecated)]
1111    fn test_sdk_watch_deploy_retunrs_instance() {
1112        // Arrange
1113        let sdk = SDK::new(None, None, None);
1114        let (_, events_url, _, _, _) = get_network_constants();
1115        let timeout_duration = 5000;
1116
1117        // Act
1118        let watcher = sdk.watch_deploy(&events_url, Some(timeout_duration));
1119
1120        // Assert
1121        assert_eq!(watcher.events_url, events_url);
1122        assert_eq!(watcher.subscriptions.len(), 0);
1123        assert!(*watcher.active.lock().unwrap());
1124        assert_eq!(
1125            watcher.timeout_duration,
1126            Duration::try_milliseconds(timeout_duration.try_into().unwrap()).unwrap()
1127        );
1128    }
1129
1130    #[test]
1131    fn test_sdk_watch_transaction_retunrs_instance() {
1132        // Arrange
1133        let sdk = SDK::new(None, None, None);
1134        let (_, events_url, _, _, _) = get_network_constants();
1135        let timeout_duration = 5000;
1136
1137        // Act
1138        let watcher = sdk.watch_transaction(&events_url, Some(timeout_duration));
1139
1140        // Assert
1141        assert_eq!(watcher.events_url, events_url);
1142        assert_eq!(watcher.subscriptions.len(), 0);
1143        assert!(*watcher.active.lock().unwrap());
1144        assert_eq!(
1145            watcher.timeout_duration,
1146            Duration::try_milliseconds(timeout_duration.try_into().unwrap()).unwrap()
1147        );
1148    }
1149
1150    #[tokio::test]
1151    #[allow(deprecated)]
1152    async fn test_wait_deploy_timeout() {
1153        // Arrange
1154        let sdk = SDK::new(None, None, None);
1155        let (_, events_url, _, _, _) = get_network_constants();
1156        let deploy_hash = "19dbf9bdcd821e55392393c74c86deede02d9434d62d0bc72ab381ce7ea1c4f2";
1157        let timeout_duration = Some(5000);
1158
1159        // Act
1160        let result = sdk
1161            .wait_deploy(&events_url, deploy_hash, timeout_duration)
1162            .await;
1163
1164        // Assert
1165        assert!(result.is_ok());
1166        let event_parse_result = result.unwrap();
1167        assert!(event_parse_result.err.is_some());
1168        assert_eq!(event_parse_result.err, Some("Timeout expired".to_string()));
1169    }
1170
1171    #[tokio::test]
1172    async fn test_wait_transaction_timeout() {
1173        // Arrange
1174        let sdk = SDK::new(None, None, None);
1175        let (_, events_url, _, _, _) = get_network_constants();
1176        let transaction_hash = "8c6823d9480eee9fe0cfb5ed1fbf77f928cc6af21121298c05b4e3d87a328271";
1177        let timeout_duration = Some(5000);
1178
1179        // Act
1180        let result = sdk
1181            .wait_transaction(&events_url, transaction_hash, timeout_duration)
1182            .await;
1183
1184        // Assert
1185        assert!(result.is_ok());
1186        let event_parse_result = result.unwrap();
1187        assert!(event_parse_result.err.is_some());
1188        assert_eq!(event_parse_result.err, Some("Timeout expired".to_string()));
1189    }
1190}