1use crate::SDK;
2use chrono::{Duration, Utc};
3use futures_util::StreamExt;
4#[cfg(target_arch = "wasm32")]
5use gloo_utils::format::JsValueSerdeExt;
6#[cfg(target_arch = "wasm32")]
7use js_sys::Promise;
8use serde::{Deserialize, Deserializer, Serialize};
9use serde_json::Value;
10use std::{
11 cell::RefCell,
12 fmt,
13 rc::Rc,
14 sync::{Arc, Mutex},
15};
16use wasm_bindgen::prelude::*;
17#[cfg(target_arch = "wasm32")]
18use wasm_bindgen_futures::future_to_promise;
19
20const DEFAULT_TIMEOUT_MS: u64 = 60000;
21
22impl SDK {
23 #[deprecated(note = "prefer 'watch_transaction'")]
35 pub fn watch_deploy(&self, events_url: &str, timeout_duration: Option<u64>) -> Watcher {
36 Watcher::new(events_url.to_string(), timeout_duration)
37 }
38
39 pub fn watch_transaction(&self, events_url: &str, timeout_duration: Option<u64>) -> Watcher {
50 Watcher::new(events_url.to_string(), timeout_duration)
51 }
52
53 #[deprecated(note = "prefer 'wait_transaction' with transaction")]
66 pub async fn wait_deploy(
67 &self,
68 events_url: &str,
69 deploy_hash: &str,
70 timeout_duration: Option<u64>,
71 ) -> Result<EventParseResult, String> {
72 Self::wait_transaction_internal(
73 events_url.to_string(),
74 deploy_hash.to_string(),
75 timeout_duration,
76 )
77 .await
78 }
79
80 pub async fn wait_transaction(
92 &self,
93 events_url: &str,
94 target_hash: &str,
95 timeout_duration: Option<u64>,
96 ) -> Result<EventParseResult, String> {
97 Self::wait_transaction_internal(
98 events_url.to_string(),
99 target_hash.to_string(),
100 timeout_duration,
101 )
102 .await
103 }
104
105 async fn wait_transaction_internal(
117 events_url: String,
118 target_hash: String,
119 timeout_duration: Option<u64>,
120 ) -> Result<EventParseResult, String> {
121 let watcher = Watcher::new(events_url, timeout_duration);
122 let result = watcher.start_internal(Some(target_hash)).await;
123 match result {
124 Some(event_parse_results) => {
125 if let Some(event_parse_result) = event_parse_results.first() {
126 return Ok(event_parse_result.clone());
127 }
128 Err("No first event result".to_string())
129 }
130 None => Err("No event result found".to_string()),
131 }
132 }
133}
134
135#[wasm_bindgen]
136impl SDK {
137 #[cfg(target_arch = "wasm32")]
149 #[wasm_bindgen(js_name = "watchDeploy")]
150 #[deprecated(note = "prefer 'watchTransaction'")]
151 #[allow(deprecated)]
152 pub fn watch_deploy_js_alias(
153 &self,
154 events_url: &str,
155 timeout_duration: Option<u32>,
156 ) -> Watcher {
157 self.watch_deploy(events_url, timeout_duration.map(Into::into))
158 }
159
160 #[cfg(target_arch = "wasm32")]
171 #[wasm_bindgen(js_name = "watchTransaction")]
172 pub fn watch_transaction_js_alias(
173 &self,
174 events_url: &str,
175 timeout_duration: Option<u32>,
176 ) -> Watcher {
177 self.watch_transaction(events_url, timeout_duration.map(Into::into))
178 }
179
180 #[cfg(target_arch = "wasm32")]
193 #[wasm_bindgen(js_name = "waitDeploy")]
194 #[deprecated(note = "prefer 'waitTransaction' with transaction")]
195 #[allow(deprecated)]
196 pub async fn wait_deploy_js_alias(
197 &self,
198 events_url: &str,
199 deploy_hash: &str,
200 timeout_duration: Option<u32>,
201 ) -> Promise {
202 self.wait_transaction_js_alias(events_url, deploy_hash, timeout_duration)
203 .await
204 }
205
206 #[cfg(target_arch = "wasm32")]
218 #[wasm_bindgen(js_name = "waitTransaction")]
219 pub async fn wait_transaction_js_alias(
220 &self,
221 events_url: &str,
222 target_hash: &str,
223 timeout_duration: Option<u32>,
224 ) -> Promise {
225 let events_url = events_url.to_string();
226 let target_hash = target_hash.to_string();
227 let future = async move {
228 let result = Self::wait_transaction_internal(
229 events_url,
230 target_hash,
231 timeout_duration.map(Into::into),
232 )
233 .await;
234 match result {
235 Ok(event_parse_result) => JsValue::from_serde(&event_parse_result)
236 .map_err(|err| JsValue::from_str(&format!("{err}"))),
237 Err(err) => Err(JsValue::from_str(&err)),
238 }
239 };
240
241 future_to_promise(future)
242 }
243}
244
245#[derive(Clone)]
257#[wasm_bindgen]
258pub struct Watcher {
259 events_url: String,
260 subscriptions: Vec<Subscription>,
261 active: Rc<RefCell<bool>>,
262 timeout_duration: Duration,
263}
264
265#[wasm_bindgen]
266impl Watcher {
267 #[wasm_bindgen(constructor)]
279 pub fn new(events_url: String, timeout_duration: Option<u64>) -> Self {
280 let timeout_duration = Duration::try_milliseconds(
281 timeout_duration
282 .unwrap_or(DEFAULT_TIMEOUT_MS)
283 .try_into()
284 .unwrap(),
285 )
286 .unwrap_or_default();
287
288 Watcher {
289 events_url,
290 subscriptions: Vec::new(),
291 active: Rc::new(RefCell::new(true)),
292 timeout_duration,
293 }
294 }
295
296 #[cfg(target_arch = "wasm32")]
306 #[wasm_bindgen(js_name = "subscribe")]
307 pub fn subscribe_js_alias(&mut self, subscriptions: Vec<Subscription>) -> Result<(), String> {
308 self.subscribe(subscriptions)
309 }
310
311 #[wasm_bindgen]
319 pub fn unsubscribe(&mut self, target_hash: String) {
320 self.subscriptions.retain(|s| s.target_hash != target_hash);
321 }
322
323 #[cfg(target_arch = "wasm32")]
329 #[wasm_bindgen(js_name = "start")]
330 pub async fn start_js_alias(&self) -> Result<JsValue, JsError> {
331 let result = match self.start_internal(None).await {
332 Some(res) => res,
333 None => return Ok(JsValue::NULL),
334 };
335
336 let serialized = JsValue::from_serde(&result)
337 .map_err(|err| JsError::new(&format!("Error serializing events: {:?}", err)))?;
338
339 Ok(serialized)
340 }
341
342 #[wasm_bindgen]
346 pub fn stop(&self) {
347 *self.active.borrow_mut() = false;
348 }
349}
350
351impl Watcher {
352 pub async fn start(&self) -> Option<Vec<EventParseResult>> {
358 self.start_internal(None).await
359 }
360
361 async fn start_internal(&self, target_hash: Option<String>) -> Option<Vec<EventParseResult>> {
371 *self.active.borrow_mut() = true;
372
373 let client = reqwest::Client::new();
374 let url = self.events_url.clone();
375
376 let watcher = Rc::new(RefCell::new(self.clone()));
377
378 let start_time = Utc::now();
379 let timeout_duration = self.timeout_duration;
380
381 let response = match client.get(&url).send().await {
382 Ok(res) => res,
383 Err(err) => {
384 let err = err.to_string();
385 let event_parse_result = EventParseResult {
386 err: Some(err.to_string()),
387 body: None,
388 };
389 return Some([event_parse_result].to_vec());
390 }
391 };
392
393 if response.status().is_success() {
394 let buffer_size = 1;
395 let mut buffer = Vec::with_capacity(buffer_size);
396
397 let mut bytes_stream = response.bytes_stream();
398 while let Some(chunk) = bytes_stream.next().await {
399 match chunk {
400 Ok(bytes) => {
401 let this_clone = Rc::clone(&watcher);
402 if !*this_clone.borrow_mut().active.borrow() {
403 return None;
404 }
405
406 if Utc::now() - start_time >= timeout_duration {
407 let event_parse_result = EventParseResult {
408 err: Some("Timeout expired".to_string()),
409 body: None,
410 };
411 return Some([event_parse_result].to_vec());
412 }
413
414 buffer.extend_from_slice(&bytes);
415
416 while let Some(index) = buffer.iter().position(|&b| b == b'\n') {
417 let message = buffer.drain(..=index).collect::<Vec<_>>();
418
419 if let Ok(message) = std::str::from_utf8(&message) {
420 let watcher_clone = this_clone.borrow_mut().clone();
421 let result =
422 watcher_clone.process_events(message, target_hash.as_deref());
423 match result {
424 Some(event_parse_result) => return Some(event_parse_result),
425 None => {
426 continue;
427 }
428 };
429 } else {
430 let event_parse_result = EventParseResult {
431 err: Some("Error decoding UTF-8 data".to_string()),
432 body: None,
433 };
434 return Some([event_parse_result].to_vec());
435 }
436 }
437 }
438 Err(err) => {
439 let event_parse_result = EventParseResult {
440 err: Some(format!("Error reading chunk: {}", err)),
441 body: None,
442 };
443 return Some([event_parse_result].to_vec());
444 }
445 }
446 }
447 } else {
448 let event_parse_result = EventParseResult {
449 err: Some("Failed to fetch stream".to_string()),
450 body: None,
451 };
452 return Some([event_parse_result].to_vec());
453 }
454 None
455 }
456
457 pub fn subscribe(&mut self, subscriptions: Vec<Subscription>) -> Result<(), String> {
467 for new_subscription in &subscriptions {
468 if self
469 .subscriptions
470 .iter()
471 .any(|s| s.target_hash == new_subscription.target_hash)
472 {
473 return Err(String::from("Already subscribed to this event"));
474 }
475 }
476 self.subscriptions.extend(subscriptions);
477 Ok(())
478 }
479
480 fn process_events(
491 mut self,
492 message: &str,
493 target_hash: Option<&str>,
494 ) -> Option<Vec<EventParseResult>> {
495 let data_stream = Self::extract_data_stream(message);
496
497 for data_item in data_stream {
498 let trimmed_item = data_item.trim();
499 let transaction_processed_str = EventName::TransactionProcessed.to_string();
500
501 if !trimmed_item.contains(&transaction_processed_str) {
502 continue;
503 }
504
505 if let Ok(parsed_json) = serde_json::from_str::<Value>(trimmed_item) {
506 let transaction = parsed_json.get(transaction_processed_str);
507 if let Some(transaction_processed) =
508 transaction.and_then(|transaction| transaction.as_object())
509 {
510 if let Some(transaction_hash_processed) = transaction_processed
511 .get("transaction_hash")
512 .and_then(|transaction_hash| {
513 transaction_hash
514 .get("Version1")
515 .or_else(|| transaction_hash.get("Deploy"))
516 .and_then(|transaction_hash| transaction_hash.as_str())
517 })
518 {
519 let mut transaction_hash_found =
520 target_hash == Some(transaction_hash_processed);
521
522 let transaction_processed: Option<TransactionProcessed> =
523 serde_json::from_value(transaction.unwrap().clone()).ok();
524
525 let body = Some(Body {
526 transaction_processed,
527 });
528
529 let event_parse_result = EventParseResult { err: None, body };
530
531 if transaction_hash_found {
532 self.unsubscribe(target_hash.unwrap().to_string());
533 self.stop();
534 return Some([event_parse_result].to_vec());
535 }
536
537 let mut results: Vec<EventParseResult> = [].to_vec();
538 for subscription in self.subscriptions.clone().iter() {
539 if transaction_hash_processed == subscription.target_hash {
540 let event_handler = &subscription.event_handler_fn;
541
542 #[cfg(not(target_arch = "wasm32"))]
543 {
544 event_handler.call(event_parse_result.clone());
545 }
546 #[cfg(target_arch = "wasm32")]
547 {
548 let this = JsValue::null();
549 let args = js_sys::Array::new();
550 args.push(
551 &JsValue::from_serde(&event_parse_result.clone()).unwrap(),
552 );
553 event_handler.apply(&this, &args).unwrap();
554 }
555
556 self.unsubscribe(transaction_hash_processed.to_string());
557 transaction_hash_found = true;
558 results.push(event_parse_result.clone())
559 }
560 }
561
562 if transaction_hash_found && self.subscriptions.is_empty() {
563 self.stop();
564 return Some(results);
565 }
566 }
567 }
568 } else {
569 let event_parse_result = EventParseResult {
570 err: Some("Failed to parse JSON data.".to_string()),
571 body: None,
572 };
573 return Some([event_parse_result].to_vec());
574 }
575 }
576 None
577 }
578
579 fn extract_data_stream(json_data: &str) -> Vec<&str> {
589 let data_stream: Vec<&str> = json_data
590 .split("data:")
591 .filter(|s| !s.is_empty())
592 .map(|s| s.split("id:").next().unwrap_or(""))
593 .collect();
594 data_stream
595 }
596}
597
598pub struct EventHandlerFn(Arc<Mutex<dyn Fn(EventParseResult) + Send + Sync>>);
600
601#[allow(dead_code)]
602impl EventHandlerFn {
603 pub fn new<F>(func: F) -> Self
613 where
614 F: Fn(EventParseResult) + Send + Sync + 'static,
615 {
616 EventHandlerFn(Arc::new(Mutex::new(func)))
617 }
618
619 pub fn call(&self, event_result: EventParseResult) {
625 let func = self.0.lock().unwrap();
626 (*func)(event_result); }
628}
629
630impl fmt::Debug for EventHandlerFn {
631 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
633 write!(f, "EventHandlerFn")
634 }
635}
636
637impl Clone for EventHandlerFn {
638 fn clone(&self) -> Self {
640 EventHandlerFn(self.0.clone())
641 }
642}
643
644impl Default for EventHandlerFn {
645 fn default() -> Self {
647 EventHandlerFn(Arc::new(Mutex::new(|_event_result| {})))
648 }
649}
650
651#[cfg(not(target_arch = "wasm32"))]
653#[derive(Debug, Clone, Default)]
655pub struct Subscription {
656 pub target_hash: String,
658 pub event_handler_fn: EventHandlerFn,
660}
661
662#[cfg(target_arch = "wasm32")]
663#[derive(Debug, Clone, Default)]
665#[wasm_bindgen(getter_with_clone)]
666pub struct Subscription {
667 #[wasm_bindgen(js_name = "targetHash")]
669 pub target_hash: String,
670 #[wasm_bindgen(js_name = "eventHandlerFn")]
672 pub event_handler_fn: js_sys::Function,
673}
674
675impl Subscription {
676 #[cfg(not(target_arch = "wasm32"))]
683 pub fn new(target_hash: String, event_handler_fn: EventHandlerFn) -> Self {
684 Self {
685 target_hash,
686 event_handler_fn,
687 }
688 }
689}
690
691#[wasm_bindgen]
692impl Subscription {
693 #[cfg(target_arch = "wasm32")]
700 #[wasm_bindgen(constructor)]
701 pub fn new(target_hash: String, event_handler_fn: js_sys::Function) -> Self {
702 Self {
703 target_hash,
704 event_handler_fn,
705 }
706 }
707}
708
709#[derive(Debug, Deserialize, Clone, Default, Serialize)]
711#[wasm_bindgen(getter_with_clone)]
712pub struct Failure {
713 pub cost: String,
714 pub error_message: String,
715}
716
717#[derive(Debug, Deserialize, Clone, Default, Serialize)]
719#[wasm_bindgen(getter_with_clone)]
720pub struct Version2 {
721 pub initiator: PublicKeyString,
722 pub error_message: Option<String>,
723 pub limit: String,
724 pub consumed: String,
725 pub cost: String,
726 }
728
729#[derive(Debug, Deserialize, Clone, Default, Serialize)]
730#[wasm_bindgen(getter_with_clone)]
731pub struct Payment {
732 pub source: String,
733}
734
735#[derive(Debug, Deserialize, Clone, Default, Serialize)]
737#[wasm_bindgen(getter_with_clone)]
738pub struct ExecutionResult {
739 #[serde(rename = "Version2")]
741 #[wasm_bindgen(js_name = "Success")]
742 pub success: Option<Version2>,
743 #[serde(rename = "Failure")]
745 #[wasm_bindgen(js_name = "Failure")]
746 pub failure: Option<Failure>,
747}
748
749#[derive(Debug, Clone, Serialize, Default)]
750#[wasm_bindgen(getter_with_clone)]
751pub struct HashString {
752 pub hash: String,
753}
754
755impl HashString {
756 fn from_hash(hash: String) -> Self {
757 HashString { hash }
758 }
759}
760
761impl<'de> Deserialize<'de> for HashString {
762 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
763 where
764 D: Deserializer<'de>,
765 {
766 let map: std::collections::HashMap<String, String> =
767 Deserialize::deserialize(deserializer)?;
768
769 if let Some(hash) = map.get("Version1").or_else(|| map.get("Deploy")) {
770 Ok(HashString::from_hash(hash.clone()))
771 } else {
772 Err(serde::de::Error::missing_field("Deploy or Version1"))
773 }
774 }
775}
776
777#[wasm_bindgen]
778impl HashString {
779 #[wasm_bindgen(getter, js_name = "Deploy")]
780 pub fn deploy(&self) -> String {
781 self.hash.clone()
782 }
783
784 #[wasm_bindgen(getter, js_name = "Version1")]
785 pub fn version1(&self) -> String {
786 self.hash.clone()
787 }
788
789 #[wasm_bindgen(js_name = "toString")]
790 pub fn to_string_js(&self) -> String {
791 self.to_string()
792 }
793}
794
795impl fmt::Display for HashString {
796 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
797 write!(f, "{}", self.hash)
798 }
799}
800
801#[derive(Debug, Deserialize, Clone, Serialize, Default)]
802#[wasm_bindgen(getter_with_clone)]
803pub struct PublicKeyString {
804 #[serde(rename = "PublicKey")]
805 #[wasm_bindgen(js_name = "PublicKey")]
806 pub public_key: String,
807}
808
809#[derive(Debug, Deserialize, Clone, Serialize, Default)]
810#[wasm_bindgen(getter_with_clone)]
811pub struct Message {
812 #[serde(rename = "String")]
813 #[wasm_bindgen(js_name = "String")]
814 pub string: String,
815}
816
817#[derive(Debug, Deserialize, Clone, Serialize, Default)]
818#[wasm_bindgen(getter_with_clone)]
819pub struct Messages {
820 pub entity_hash: String,
821 pub message: Message,
822 pub topic_name: String,
823 pub topic_name_hash: String,
824 pub topic_index: u32,
825 pub block_index: u64,
826}
827
828#[derive(Debug, Deserialize, Clone, Default, Serialize)]
830#[wasm_bindgen(getter_with_clone)]
831pub struct TransactionProcessed {
832 #[serde(alias = "transaction_hash")]
833 pub hash: HashString,
834 pub initiator_addr: PublicKeyString,
835 pub timestamp: String,
836 pub ttl: String,
837 pub block_hash: String,
838 pub execution_result: ExecutionResult,
840 pub messages: Vec<Messages>,
841}
842
843#[derive(Debug, Deserialize, Clone, Default, Serialize)]
845#[wasm_bindgen(getter_with_clone)]
846pub struct Body {
847 #[serde(rename = "TransactionProcessed")]
848 pub transaction_processed: Option<TransactionProcessed>,
849}
850
851#[wasm_bindgen]
853impl Body {
854 #[wasm_bindgen(getter, js_name = "get_deploy_processed")]
855 #[deprecated(note = "prefer 'get_transaction_processed'")]
856 #[allow(deprecated)]
857 pub fn get_deploy_processed(&self) -> Option<TransactionProcessed> {
858 self.transaction_processed.clone()
859 }
860
861 #[wasm_bindgen(getter, js_name = "get_transaction_processed")]
862 pub fn get_transaction_processed(&self) -> Option<TransactionProcessed> {
863 self.transaction_processed.clone()
864 }
865}
866
867#[derive(Debug, Deserialize, Clone, Default, Serialize)]
869#[wasm_bindgen(getter_with_clone)]
870pub struct EventParseResult {
871 pub err: Option<String>,
872 pub body: Option<Body>,
873}
874
875#[derive(Debug, Deserialize, Clone, Serialize)]
877enum EventName {
878 BlockAdded,
879 TransactionAccepted,
880 TransactionExpired,
881 TransactionProcessed,
882 Step,
883 FinalitySignature,
884 Fault,
885}
886
887impl fmt::Display for EventName {
888 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
890 match self {
891 EventName::BlockAdded => write!(f, "BlockAdded"),
892 EventName::TransactionAccepted => write!(f, "TransactionAccepted"),
893 EventName::TransactionExpired => write!(f, "TransactionExpired"),
894 EventName::TransactionProcessed => write!(f, "TransactionProcessed"),
895 EventName::Step => write!(f, "Step"),
896 EventName::FinalitySignature => write!(f, "FinalitySignature"),
897 EventName::Fault => write!(f, "Fault"),
898 }
899 }
900}
901
902#[cfg(test)]
903mod tests {
904 use super::*;
905 use crate::watcher::{deploy_mock::DEPLOY_MOCK, transaction_mock::TRANSACTION_MOCK};
906 use sdk_tests::tests::helpers::get_network_constants;
907 use tokio;
908
909 #[test]
910 fn test_new() {
911 let (_, events_url, _, _, _) = get_network_constants();
913 let timeout_duration = 5000;
914
915 let watcher = Watcher::new(events_url.clone(), Some(timeout_duration));
917
918 assert_eq!(watcher.events_url, events_url);
920 assert_eq!(watcher.subscriptions.len(), 0);
921 assert!(*watcher.active.borrow());
922 assert_eq!(
923 watcher.timeout_duration,
924 Duration::try_milliseconds(timeout_duration.try_into().unwrap()).unwrap()
925 );
926 }
927
928 #[test]
929 fn test_new_default_timeout() {
930 let (_, events_url, _, _, _) = get_network_constants();
932
933 let watcher = Watcher::new(events_url.clone(), None);
935
936 assert_eq!(watcher.events_url, events_url);
938 assert_eq!(watcher.subscriptions.len(), 0);
939 assert!(*watcher.active.borrow());
940 assert_eq!(
941 watcher.timeout_duration,
942 Duration::try_milliseconds(DEFAULT_TIMEOUT_MS.try_into().unwrap()).unwrap()
943 );
944 }
945
946 #[tokio::test]
947 async fn test_extract_data_stream() {
948 let json_data = r#"data:segment1id:data:segment2id:data:segment3id:"#;
950
951 let result = Watcher::extract_data_stream(json_data);
953
954 assert_eq!(result, vec!["segment1", "segment2", "segment3"]);
956 }
957
958 #[tokio::test]
959 #[allow(deprecated)]
960 async fn test_process_events_legacy() {
961 let (_, events_url, _, _, _) = get_network_constants();
963 let watcher = Watcher::new(events_url, None);
964 let deploy_hash = "19dbf9bdcd821e55392393c74c86deede02d9434d62d0bc72ab381ce7ea1c4f2";
965
966 let target_deploy_hash = Some(deploy_hash);
967
968 let result = watcher.process_events(DEPLOY_MOCK, target_deploy_hash);
970
971 assert!(result.is_some());
973 let results = result.unwrap();
974 assert_eq!(results.len(), 1);
975
976 let event_parse_result = &results[0];
977 assert!(event_parse_result.err.is_none());
978
979 let body = event_parse_result.body.as_ref().unwrap();
980 let get_deploy_processed = body.get_deploy_processed().unwrap();
981 assert_eq!(get_deploy_processed.hash.to_string(), deploy_hash);
982 }
983
984 #[tokio::test]
985 async fn test_process_events() {
986 let (_, events_url, _, _, _) = get_network_constants();
988 let watcher = Watcher::new(events_url, None);
989 let transaction_hash = "8c6823d9480eee9fe0cfb5ed1fbf77f928cc6af21121298c05b4e3d87a328271";
990
991 let target_transaction_hash = Some(transaction_hash);
992
993 let result = watcher.process_events(TRANSACTION_MOCK, target_transaction_hash);
995
996 assert!(result.is_some());
998 let results = result.unwrap();
999 assert_eq!(results.len(), 1);
1000
1001 let event_parse_result = &results[0];
1002 assert!(event_parse_result.err.is_none());
1003
1004 let body = event_parse_result.body.as_ref().unwrap();
1005 let transaction_processed = body.get_transaction_processed().unwrap();
1006 assert_eq!(transaction_processed.hash.to_string(), transaction_hash);
1007 }
1008
1009 #[tokio::test]
1010 async fn test_start_timeout() {
1011 let (_, events_url, _, _, _) = get_network_constants();
1013 let watcher = Watcher::new(events_url, Some(1));
1014
1015 let result = watcher.start().await;
1017
1018 assert!(result.is_some());
1020 let results = result.unwrap();
1021 assert_eq!(results.len(), 1);
1022 assert_eq!(results[0].err, Some("Timeout expired".to_string()));
1023 assert!(results[0].body.is_none());
1024 }
1025
1026 #[test]
1027 fn test_stop() {
1028 let (_, events_url, _, _, _) = get_network_constants();
1030 let watcher = Watcher::new(events_url, None);
1031 assert!(*watcher.active.borrow());
1032
1033 watcher.stop();
1035
1036 assert!(!(*watcher.active.borrow()));
1038 }
1039
1040 #[test]
1041 fn test_subscribe() {
1042 let (_, events_url, _, _, _) = get_network_constants();
1044 let mut watcher = Watcher::new(events_url, None);
1045 let transaction_hash = "8c6823d9480eee9fe0cfb5ed1fbf77f928cc6af21121298c05b4e3d87a328271";
1046
1047 let subscription =
1049 Subscription::new(transaction_hash.to_string(), EventHandlerFn::default());
1050
1051 let result = watcher.subscribe(vec![subscription]);
1053
1054 assert!(result.is_ok());
1056
1057 let duplicate_subscription =
1059 Subscription::new(transaction_hash.to_string(), EventHandlerFn::default());
1060 let result_duplicate = watcher.subscribe(vec![duplicate_subscription]);
1061
1062 assert!(result_duplicate.is_err());
1064 assert_eq!(
1065 result_duplicate.err().unwrap(),
1066 "Already subscribed to this event"
1067 );
1068 }
1069
1070 #[test]
1071 fn test_unsubscribe() {
1072 let (_, events_url, _, _, _) = get_network_constants();
1074 let mut watcher = Watcher::new(events_url, None);
1075 let transaction_hash = "8c6823d9480eee9fe0cfb5ed1fbf77f928cc6af21121298c05b4e3d87a328271";
1076
1077 let transaction_hash_to_subscribe = transaction_hash.to_string();
1079 let subscription = Subscription::new(
1080 transaction_hash_to_subscribe.clone(),
1081 EventHandlerFn::default(),
1082 );
1083 let _ = watcher.subscribe(vec![subscription]);
1084
1085 assert!(watcher
1087 .subscriptions
1088 .iter()
1089 .any(|s| s.target_hash == transaction_hash_to_subscribe));
1090
1091 watcher.unsubscribe(transaction_hash_to_subscribe.clone());
1093
1094 assert!(!watcher
1096 .subscriptions
1097 .iter()
1098 .any(|s| s.target_hash == transaction_hash_to_subscribe));
1099 }
1100
1101 #[test]
1102 #[allow(deprecated)]
1103 fn test_sdk_watch_deploy_retunrs_instance() {
1104 let sdk = SDK::new(None, None, None);
1106 let (_, events_url, _, _, _) = get_network_constants();
1107 let timeout_duration = 5000;
1108
1109 let watcher = sdk.watch_deploy(&events_url, Some(timeout_duration));
1111
1112 assert_eq!(watcher.events_url, events_url);
1114 assert_eq!(watcher.subscriptions.len(), 0);
1115 assert!(*watcher.active.borrow());
1116 assert_eq!(
1117 watcher.timeout_duration,
1118 Duration::try_milliseconds(timeout_duration.try_into().unwrap()).unwrap()
1119 );
1120 }
1121
1122 #[test]
1123 fn test_sdk_watch_transaction_retunrs_instance() {
1124 let sdk = SDK::new(None, None, None);
1126 let (_, events_url, _, _, _) = get_network_constants();
1127 let timeout_duration = 5000;
1128
1129 let watcher = sdk.watch_transaction(&events_url, Some(timeout_duration));
1131
1132 assert_eq!(watcher.events_url, events_url);
1134 assert_eq!(watcher.subscriptions.len(), 0);
1135 assert!(*watcher.active.borrow());
1136 assert_eq!(
1137 watcher.timeout_duration,
1138 Duration::try_milliseconds(timeout_duration.try_into().unwrap()).unwrap()
1139 );
1140 }
1141
1142 #[tokio::test]
1143 #[allow(deprecated)]
1144 async fn test_wait_deploy_timeout() {
1145 let sdk = SDK::new(None, None, None);
1147 let (_, events_url, _, _, _) = get_network_constants();
1148 let deploy_hash = "19dbf9bdcd821e55392393c74c86deede02d9434d62d0bc72ab381ce7ea1c4f2";
1149 let timeout_duration = Some(5000);
1150
1151 let result = sdk
1153 .wait_deploy(&events_url, deploy_hash, timeout_duration)
1154 .await;
1155
1156 assert!(result.is_ok());
1158 let event_parse_result = result.unwrap();
1159 assert!(event_parse_result.err.is_some());
1160 assert_eq!(event_parse_result.err, Some("Timeout expired".to_string()));
1161 }
1162
1163 #[tokio::test]
1164 async fn test_wait_transaction_timeout() {
1165 let sdk = SDK::new(None, None, None);
1167 let (_, events_url, _, _, _) = get_network_constants();
1168 let transaction_hash = "8c6823d9480eee9fe0cfb5ed1fbf77f928cc6af21121298c05b4e3d87a328271";
1169 let timeout_duration = Some(5000);
1170
1171 let result = sdk
1173 .wait_transaction(&events_url, transaction_hash, timeout_duration)
1174 .await;
1175
1176 assert!(result.is_ok());
1178 let event_parse_result = result.unwrap();
1179 assert!(event_parse_result.err.is_some());
1180 assert_eq!(event_parse_result.err, Some("Timeout expired".to_string()));
1181 }
1182}