1use crate::SDK;
2use chrono::{Duration, Utc};
3use futures_util::StreamExt;
4#[cfg(target_arch = "wasm32")]
5use gloo_utils::format::JsValueSerdeExt;
6#[cfg(target_arch = "wasm32")]
7use js_sys::Promise;
8use serde::{Deserialize, Deserializer, Serialize};
9use serde_json::Value;
10use std::{
11 borrow::BorrowMut,
12 fmt,
13 sync::{Arc, Mutex},
14};
15use wasm_bindgen::prelude::*;
16#[cfg(target_arch = "wasm32")]
17use wasm_bindgen_futures::future_to_promise;
18
19const DEFAULT_TIMEOUT_MS: u64 = 60000;
20
21impl SDK {
22 #[deprecated(note = "prefer 'watch_transaction'")]
34 pub fn watch_deploy(&self, events_url: &str, timeout_duration: Option<u64>) -> Watcher {
35 Watcher::new(events_url.to_string(), timeout_duration)
36 }
37
38 pub fn watch_transaction(&self, events_url: &str, timeout_duration: Option<u64>) -> Watcher {
49 Watcher::new(events_url.to_string(), timeout_duration)
50 }
51
52 #[deprecated(note = "prefer 'wait_transaction' with transaction")]
65 pub async fn wait_deploy(
66 &self,
67 events_url: &str,
68 deploy_hash: &str,
69 timeout_duration: Option<u64>,
70 ) -> Result<EventParseResult, String> {
71 Self::wait_transaction_internal(
72 events_url.to_string(),
73 deploy_hash.to_string(),
74 timeout_duration,
75 )
76 .await
77 }
78
79 pub async fn wait_transaction(
91 &self,
92 events_url: &str,
93 target_hash: &str,
94 timeout_duration: Option<u64>,
95 ) -> Result<EventParseResult, String> {
96 Self::wait_transaction_internal(
97 events_url.to_string(),
98 target_hash.to_string(),
99 timeout_duration,
100 )
101 .await
102 }
103
104 async fn wait_transaction_internal(
116 events_url: String,
117 target_hash: String,
118 timeout_duration: Option<u64>,
119 ) -> Result<EventParseResult, String> {
120 let watcher = Watcher::new(events_url, timeout_duration);
121 let result = watcher.start_internal(Some(target_hash)).await;
122 match result {
123 Some(event_parse_results) => {
124 if let Some(event_parse_result) = event_parse_results.first() {
125 return Ok(event_parse_result.clone());
126 }
127 Err("No first event result".to_string())
128 }
129 None => Err("No event result found".to_string()),
130 }
131 }
132}
133
134#[wasm_bindgen]
135impl SDK {
136 #[cfg(target_arch = "wasm32")]
148 #[wasm_bindgen(js_name = "watchDeploy")]
149 #[deprecated(note = "prefer 'watchTransaction'")]
150 #[allow(deprecated)]
151 pub fn watch_deploy_js_alias(
152 &self,
153 events_url: &str,
154 timeout_duration: Option<u32>,
155 ) -> Watcher {
156 self.watch_deploy(events_url, timeout_duration.map(Into::into))
157 }
158
159 #[cfg(target_arch = "wasm32")]
170 #[wasm_bindgen(js_name = "watchTransaction")]
171 pub fn watch_transaction_js_alias(
172 &self,
173 events_url: &str,
174 timeout_duration: Option<u32>,
175 ) -> Watcher {
176 self.watch_transaction(events_url, timeout_duration.map(Into::into))
177 }
178
179 #[cfg(target_arch = "wasm32")]
192 #[wasm_bindgen(js_name = "waitDeploy")]
193 #[deprecated(note = "prefer 'waitTransaction' with transaction")]
194 #[allow(deprecated)]
195 pub async fn wait_deploy_js_alias(
196 &self,
197 events_url: &str,
198 deploy_hash: &str,
199 timeout_duration: Option<u32>,
200 ) -> Promise {
201 self.wait_transaction_js_alias(events_url, deploy_hash, timeout_duration)
202 .await
203 }
204
205 #[cfg(target_arch = "wasm32")]
217 #[wasm_bindgen(js_name = "waitTransaction")]
218 pub async fn wait_transaction_js_alias(
219 &self,
220 events_url: &str,
221 target_hash: &str,
222 timeout_duration: Option<u32>,
223 ) -> Promise {
224 let events_url = events_url.to_string();
225 let target_hash = target_hash.to_string();
226 let future = async move {
227 let result = Self::wait_transaction_internal(
228 events_url,
229 target_hash,
230 timeout_duration.map(Into::into),
231 )
232 .await;
233 match result {
234 Ok(event_parse_result) => JsValue::from_serde(&event_parse_result)
235 .map_err(|err| JsValue::from_str(&format!("{err}"))),
236 Err(err) => Err(JsValue::from_str(&err)),
237 }
238 };
239
240 future_to_promise(future)
241 }
242}
243
244#[derive(Clone)]
256#[wasm_bindgen]
257pub struct Watcher {
258 events_url: String,
259 subscriptions: Vec<Subscription>,
260 active: Arc<Mutex<bool>>,
261 timeout_duration: Duration,
262}
263
264#[wasm_bindgen]
265impl Watcher {
266 #[wasm_bindgen(constructor)]
278 pub fn new(events_url: String, timeout_duration: Option<u64>) -> Self {
279 let timeout_duration = Duration::try_milliseconds(
280 timeout_duration
281 .unwrap_or(DEFAULT_TIMEOUT_MS)
282 .try_into()
283 .unwrap(),
284 )
285 .unwrap_or_default();
286
287 Watcher {
288 events_url,
289 subscriptions: Vec::new(),
290 active: Arc::new(Mutex::new(true)),
291 timeout_duration,
292 }
293 }
294
295 #[cfg(target_arch = "wasm32")]
305 #[wasm_bindgen(js_name = "subscribe")]
306 pub fn subscribe_js_alias(&mut self, subscriptions: Vec<Subscription>) -> Result<(), String> {
307 self.subscribe(subscriptions)
308 }
309
310 #[wasm_bindgen]
318 pub fn unsubscribe(&mut self, target_hash: String) {
319 self.subscriptions.retain(|s| s.target_hash != target_hash);
320 }
321
322 #[cfg(target_arch = "wasm32")]
328 #[wasm_bindgen(js_name = "start")]
329 pub async fn start_js_alias(&self) -> Result<JsValue, JsError> {
330 let result = match self.start_internal(None).await {
331 Some(res) => res,
332 None => return Ok(JsValue::NULL),
333 };
334
335 let serialized = JsValue::from_serde(&result)
336 .map_err(|err| JsError::new(&format!("Error serializing events: {err:?}")))?;
337
338 Ok(serialized)
339 }
340
341 #[wasm_bindgen]
345 pub fn stop(&self) {
346 {
347 let mut active = self.active.lock().unwrap();
348 *active = false;
349 }
350 }
351}
352
353impl Watcher {
354 pub async fn start(&self) -> Option<Vec<EventParseResult>> {
360 self.start_internal(None).await
361 }
362
363 async fn start_internal(&self, target_hash: Option<String>) -> Option<Vec<EventParseResult>> {
373 {
374 let mut active = self.active.lock().unwrap();
375 *active = true;
376 }
377
378 let client = reqwest::Client::new();
379 let url = self.events_url.clone();
380
381 let watcher = Arc::new(Mutex::new(self.clone()));
382
383 let start_time = Utc::now();
384 let timeout_duration = self.timeout_duration;
385
386 let response = match client.get(&url).send().await {
387 Ok(res) => res,
388 Err(err) => {
389 let err = err.to_string();
390 let event_parse_result = EventParseResult {
391 err: Some(err.to_string()),
392 body: None,
393 };
394 return Some([event_parse_result].to_vec());
395 }
396 };
397
398 if response.status().is_success() {
399 let buffer_size = 1;
400 let mut buffer = Vec::with_capacity(buffer_size);
401
402 let mut bytes_stream = response.bytes_stream();
403 while let Some(chunk) = bytes_stream.next().await {
404 match chunk {
405 Ok(bytes) => {
406 let this_clone = Arc::clone(&watcher);
407 if !*this_clone.lock().unwrap().active.lock().unwrap() {
408 return None;
409 }
410
411 if Utc::now() - start_time >= timeout_duration {
412 let event_parse_result = EventParseResult {
413 err: Some("Timeout expired".to_string()),
414 body: None,
415 };
416 return Some([event_parse_result].to_vec());
417 }
418
419 buffer.extend_from_slice(&bytes);
420
421 while let Some(index) = buffer.iter().position(|&b| b == b'\n') {
422 let message = buffer.drain(..=index).collect::<Vec<_>>();
423
424 if let Ok(message) = std::str::from_utf8(&message) {
425 let watcher_guard = this_clone.lock().unwrap();
426 let result = watcher_guard
427 .clone()
428 .process_events(message, target_hash.as_deref());
429 match result {
430 Some(event_parse_result) => return Some(event_parse_result),
431 None => {
432 continue;
433 }
434 };
435 } else {
436 let event_parse_result = EventParseResult {
437 err: Some("Error decoding UTF-8 data".to_string()),
438 body: None,
439 };
440 return Some([event_parse_result].to_vec());
441 }
442 }
443 }
444 Err(err) => {
445 let event_parse_result = EventParseResult {
446 err: Some(format!("Error reading chunk: {err}")),
447 body: None,
448 };
449 return Some([event_parse_result].to_vec());
450 }
451 }
452 }
453 } else {
454 let event_parse_result = EventParseResult {
455 err: Some("Failed to fetch stream".to_string()),
456 body: None,
457 };
458 return Some([event_parse_result].to_vec());
459 }
460 None
461 }
462
463 pub fn subscribe(&mut self, subscriptions: Vec<Subscription>) -> Result<(), String> {
473 for new_subscription in &subscriptions {
474 if self
475 .subscriptions
476 .iter()
477 .any(|s| s.target_hash == new_subscription.target_hash)
478 {
479 return Err(String::from("Already subscribed to this event"));
480 }
481 }
482 self.subscriptions.extend(subscriptions);
483 Ok(())
484 }
485
486 fn process_events(
497 mut self,
498 message: &str,
499 target_hash: Option<&str>,
500 ) -> Option<Vec<EventParseResult>> {
501 let data_stream = Self::extract_data_stream(message);
502
503 for data_item in data_stream {
504 let trimmed_item = data_item.trim();
505 let transaction_processed_str = EventName::TransactionProcessed.to_string();
506
507 if !trimmed_item.contains(&transaction_processed_str) {
508 continue;
509 }
510
511 if let Ok(parsed_json) = serde_json::from_str::<Value>(trimmed_item) {
512 let transaction = parsed_json.get(transaction_processed_str);
513 if let Some(transaction_processed) =
514 transaction.and_then(|transaction| transaction.as_object())
515 {
516 if let Some(transaction_hash_processed) = transaction_processed
517 .get("transaction_hash")
518 .and_then(|transaction_hash| {
519 transaction_hash
520 .get("Version1")
521 .or_else(|| transaction_hash.get("Deploy"))
522 .and_then(|transaction_hash| transaction_hash.as_str())
523 })
524 {
525 let mut transaction_hash_found =
526 target_hash == Some(transaction_hash_processed);
527
528 let transaction_processed: Option<TransactionProcessed> =
529 serde_json::from_value(transaction.unwrap().clone()).ok();
530
531 let body = Some(Body {
532 transaction_processed,
533 });
534
535 let event_parse_result = EventParseResult { err: None, body };
536
537 if transaction_hash_found {
538 self.unsubscribe(target_hash.unwrap().to_string());
539 self.stop();
540 return Some([event_parse_result].to_vec());
541 }
542
543 let mut results: Vec<EventParseResult> = [].to_vec();
544 for subscription in self.subscriptions.clone().iter() {
545 if transaction_hash_processed == subscription.target_hash {
546 let event_handler = &subscription.event_handler_fn;
547
548 #[cfg(not(target_arch = "wasm32"))]
549 {
550 event_handler.call(event_parse_result.clone());
551 }
552 #[cfg(target_arch = "wasm32")]
553 {
554 let this = JsValue::null();
555 let args = js_sys::Array::new();
556 args.push(
557 &JsValue::from_serde(&event_parse_result.clone()).unwrap(),
558 );
559 event_handler.apply(&this, &args).unwrap();
560 }
561
562 self.unsubscribe(transaction_hash_processed.to_string());
563 transaction_hash_found = true;
564 results.push(event_parse_result.clone())
565 }
566 }
567
568 if transaction_hash_found && self.subscriptions.is_empty() {
569 self.stop();
570 return Some(results);
571 }
572 }
573 }
574 } else {
575 let event_parse_result = EventParseResult {
576 err: Some("Failed to parse JSON data.".to_string()),
577 body: None,
578 };
579 return Some([event_parse_result].to_vec());
580 }
581 }
582 None
583 }
584
585 fn extract_data_stream(json_data: &str) -> Vec<&str> {
595 let data_stream: Vec<&str> = json_data
596 .split("data:")
597 .filter(|s| !s.is_empty())
598 .map(|s| s.split("id:").next().unwrap_or(""))
599 .collect();
600 data_stream
601 }
602}
603
604pub struct EventHandlerFn(Arc<Mutex<dyn Fn(EventParseResult) + Send + Sync>>);
606
607#[allow(dead_code)]
608impl EventHandlerFn {
609 pub fn new<F>(func: F) -> Self
619 where
620 F: Fn(EventParseResult) + Send + Sync + 'static,
621 {
622 EventHandlerFn(Arc::new(Mutex::new(func)))
623 }
624
625 pub fn call(&self, event_result: EventParseResult) {
631 let func = self.0.lock().unwrap();
632 (*func)(event_result); }
634}
635
636impl fmt::Debug for EventHandlerFn {
637 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
639 write!(f, "EventHandlerFn")
640 }
641}
642
643impl Clone for EventHandlerFn {
644 fn clone(&self) -> Self {
646 EventHandlerFn(self.0.clone())
647 }
648}
649
650impl Default for EventHandlerFn {
651 fn default() -> Self {
653 EventHandlerFn(Arc::new(Mutex::new(|_event_result| {})))
654 }
655}
656
657#[cfg(not(target_arch = "wasm32"))]
659#[derive(Debug, Clone, Default)]
661pub struct Subscription {
662 pub target_hash: String,
664 pub event_handler_fn: EventHandlerFn,
666}
667
668#[cfg(target_arch = "wasm32")]
669#[derive(Debug, Clone, Default)]
671#[wasm_bindgen(getter_with_clone)]
672pub struct Subscription {
673 #[wasm_bindgen(js_name = "targetHash")]
675 pub target_hash: String,
676 #[wasm_bindgen(js_name = "eventHandlerFn")]
678 pub event_handler_fn: js_sys::Function,
679}
680
681impl Subscription {
682 #[cfg(not(target_arch = "wasm32"))]
689 pub fn new(target_hash: String, event_handler_fn: EventHandlerFn) -> Self {
690 Self {
691 target_hash,
692 event_handler_fn,
693 }
694 }
695}
696
697#[wasm_bindgen]
698impl Subscription {
699 #[cfg(target_arch = "wasm32")]
706 #[wasm_bindgen(constructor)]
707 pub fn new(target_hash: String, event_handler_fn: js_sys::Function) -> Self {
708 Self {
709 target_hash,
710 event_handler_fn,
711 }
712 }
713}
714
715#[derive(Debug, Deserialize, Clone, Default, Serialize)]
717#[wasm_bindgen(getter_with_clone)]
718pub struct Failure {
719 pub cost: String,
720 pub error_message: String,
721}
722
723#[derive(Debug, Deserialize, Clone, Default, Serialize)]
725#[wasm_bindgen(getter_with_clone)]
726pub struct Version2 {
727 pub initiator: PublicKeyString,
728 pub error_message: Option<String>,
729 pub limit: String,
730 pub consumed: String,
731 pub cost: String,
732 }
734
735#[derive(Debug, Deserialize, Clone, Default, Serialize)]
736#[wasm_bindgen(getter_with_clone)]
737pub struct Payment {
738 pub source: String,
739}
740
741#[derive(Debug, Deserialize, Clone, Default, Serialize)]
743#[wasm_bindgen(getter_with_clone)]
744pub struct ExecutionResult {
745 #[serde(rename = "Version2")]
747 #[wasm_bindgen(js_name = "Success")]
748 pub success: Option<Version2>,
749 #[serde(rename = "Failure")]
751 #[wasm_bindgen(js_name = "Failure")]
752 pub failure: Option<Failure>,
753}
754
755#[derive(Debug, Clone, Serialize, Default)]
756#[wasm_bindgen(getter_with_clone)]
757pub struct HashString {
758 pub hash: String,
759}
760
761impl HashString {
762 fn from_hash(hash: String) -> Self {
763 HashString { hash }
764 }
765}
766
767impl<'de> Deserialize<'de> for HashString {
768 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
769 where
770 D: Deserializer<'de>,
771 {
772 let map: std::collections::HashMap<String, String> =
773 Deserialize::deserialize(deserializer)?;
774
775 if let Some(hash) = map.get("Version1").or_else(|| map.get("Deploy")) {
776 Ok(HashString::from_hash(hash.clone()))
777 } else {
778 Err(serde::de::Error::missing_field("Deploy or Version1"))
779 }
780 }
781}
782
783#[wasm_bindgen]
784impl HashString {
785 #[wasm_bindgen(getter, js_name = "Deploy")]
786 pub fn deploy(&self) -> String {
787 self.hash.clone()
788 }
789
790 #[wasm_bindgen(getter, js_name = "Version1")]
791 pub fn version1(&self) -> String {
792 self.hash.clone()
793 }
794
795 #[wasm_bindgen(js_name = "toString")]
796 pub fn to_string_js(&self) -> String {
797 self.to_string()
798 }
799}
800
801impl fmt::Display for HashString {
802 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
803 write!(f, "{}", self.hash)
804 }
805}
806
807#[derive(Debug, Deserialize, Clone, Serialize, Default)]
808#[wasm_bindgen(getter_with_clone)]
809pub struct PublicKeyString {
810 #[serde(rename = "PublicKey")]
811 #[wasm_bindgen(js_name = "PublicKey")]
812 pub public_key: String,
813}
814
815#[derive(Debug, Deserialize, Clone, Serialize, Default)]
816#[wasm_bindgen(getter_with_clone)]
817pub struct Message {
818 #[serde(rename = "String")]
819 #[wasm_bindgen(js_name = "String")]
820 pub string: String,
821}
822
823#[derive(Debug, Deserialize, Clone, Serialize, Default)]
824#[wasm_bindgen(getter_with_clone)]
825pub struct Messages {
826 pub entity_hash: String,
827 pub message: Message,
828 pub topic_name: String,
829 pub topic_name_hash: String,
830 pub topic_index: u32,
831 pub block_index: u64,
832}
833
834#[derive(Debug, Deserialize, Clone, Default, Serialize)]
836#[wasm_bindgen(getter_with_clone)]
837pub struct TransactionProcessed {
838 #[serde(alias = "transaction_hash")]
839 pub hash: HashString,
840 pub initiator_addr: PublicKeyString,
841 pub timestamp: String,
842 pub ttl: String,
843 pub block_hash: String,
844 pub execution_result: ExecutionResult,
846 pub messages: Vec<Messages>,
847}
848
849#[derive(Debug, Deserialize, Clone, Default, Serialize)]
851#[wasm_bindgen(getter_with_clone)]
852pub struct Body {
853 #[serde(rename = "TransactionProcessed")]
854 pub transaction_processed: Option<TransactionProcessed>,
855}
856
857#[wasm_bindgen]
859impl Body {
860 #[wasm_bindgen(getter, js_name = "get_deploy_processed")]
861 #[deprecated(note = "prefer 'get_transaction_processed'")]
862 #[allow(deprecated)]
863 pub fn get_deploy_processed(&self) -> Option<TransactionProcessed> {
864 self.transaction_processed.clone()
865 }
866
867 #[wasm_bindgen(getter, js_name = "get_transaction_processed")]
868 pub fn get_transaction_processed(&self) -> Option<TransactionProcessed> {
869 self.transaction_processed.clone()
870 }
871}
872
873#[derive(Debug, Deserialize, Clone, Default, Serialize)]
875#[wasm_bindgen(getter_with_clone)]
876pub struct EventParseResult {
877 pub err: Option<String>,
878 pub body: Option<Body>,
879}
880
881#[derive(Debug, Deserialize, Clone, Serialize)]
883enum EventName {
884 BlockAdded,
885 TransactionAccepted,
886 TransactionExpired,
887 TransactionProcessed,
888 Step,
889 FinalitySignature,
890 Fault,
891}
892
893impl fmt::Display for EventName {
894 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
896 match self {
897 EventName::BlockAdded => write!(f, "BlockAdded"),
898 EventName::TransactionAccepted => write!(f, "TransactionAccepted"),
899 EventName::TransactionExpired => write!(f, "TransactionExpired"),
900 EventName::TransactionProcessed => write!(f, "TransactionProcessed"),
901 EventName::Step => write!(f, "Step"),
902 EventName::FinalitySignature => write!(f, "FinalitySignature"),
903 EventName::Fault => write!(f, "Fault"),
904 }
905 }
906}
907
908#[cfg(test)]
909mod tests {
910 use std::borrow::Borrow;
911
912 use super::*;
913 use crate::watcher::{deploy_mock::DEPLOY_MOCK, transaction_mock::TRANSACTION_MOCK};
914 use sdk_tests::tests::helpers::get_network_constants;
915 use tokio;
916
917 #[test]
918 fn test_new() {
919 let (_, events_url, _, _, _) = get_network_constants();
921 let timeout_duration = 5000;
922
923 let watcher = Watcher::new(events_url.clone(), Some(timeout_duration));
925
926 assert_eq!(watcher.events_url, events_url);
928 assert_eq!(watcher.subscriptions.len(), 0);
929 assert!(*watcher.active.lock().unwrap());
930 assert_eq!(
931 watcher.timeout_duration,
932 Duration::try_milliseconds(timeout_duration.try_into().unwrap()).unwrap()
933 );
934 }
935
936 #[test]
937 fn test_new_default_timeout() {
938 let (_, events_url, _, _, _) = get_network_constants();
940
941 let watcher = Watcher::new(events_url.clone(), None);
943
944 assert_eq!(watcher.events_url, events_url);
946 assert_eq!(watcher.subscriptions.len(), 0);
947 assert!(*watcher.active.lock().unwrap());
948 assert_eq!(
949 watcher.timeout_duration,
950 Duration::try_milliseconds(DEFAULT_TIMEOUT_MS.try_into().unwrap()).unwrap()
951 );
952 }
953
954 #[tokio::test]
955 async fn test_extract_data_stream() {
956 let json_data = r#"data:segment1id:data:segment2id:data:segment3id:"#;
958
959 let result = Watcher::extract_data_stream(json_data);
961
962 assert_eq!(result, vec!["segment1", "segment2", "segment3"]);
964 }
965
966 #[tokio::test]
967 #[allow(deprecated)]
968 async fn test_process_events_legacy() {
969 let (_, events_url, _, _, _) = get_network_constants();
971 let watcher = Watcher::new(events_url, None);
972 let deploy_hash = "19dbf9bdcd821e55392393c74c86deede02d9434d62d0bc72ab381ce7ea1c4f2";
973
974 let target_deploy_hash = Some(deploy_hash);
975
976 let result = watcher.process_events(DEPLOY_MOCK, target_deploy_hash);
978
979 assert!(result.is_some());
981 let results = result.unwrap();
982 assert_eq!(results.len(), 1);
983
984 let event_parse_result = &results[0];
985 assert!(event_parse_result.err.is_none());
986
987 let body = event_parse_result.body.as_ref().unwrap();
988 let get_deploy_processed = body.get_deploy_processed().unwrap();
989 assert_eq!(get_deploy_processed.hash.to_string(), deploy_hash);
990 }
991
992 #[tokio::test]
993 async fn test_process_events() {
994 let (_, events_url, _, _, _) = get_network_constants();
996 let watcher = Watcher::new(events_url, None);
997 let transaction_hash = "8c6823d9480eee9fe0cfb5ed1fbf77f928cc6af21121298c05b4e3d87a328271";
998
999 let target_transaction_hash = Some(transaction_hash);
1000
1001 let result = watcher.process_events(TRANSACTION_MOCK, target_transaction_hash);
1003
1004 assert!(result.is_some());
1006 let results = result.unwrap();
1007 assert_eq!(results.len(), 1);
1008
1009 let event_parse_result = &results[0];
1010 assert!(event_parse_result.err.is_none());
1011
1012 let body = event_parse_result.body.as_ref().unwrap();
1013 let transaction_processed = body.get_transaction_processed().unwrap();
1014 assert_eq!(transaction_processed.hash.to_string(), transaction_hash);
1015 }
1016
1017 #[tokio::test]
1018 async fn test_start_timeout() {
1019 let (_, events_url, _, _, _) = get_network_constants();
1021 let watcher = Watcher::new(events_url, Some(1));
1022
1023 let result = watcher.start().await;
1025
1026 assert!(result.is_some());
1028 let results = result.unwrap();
1029 assert_eq!(results.len(), 1);
1030 assert_eq!(results[0].err, Some("Timeout expired".to_string()));
1031 assert!(results[0].body.is_none());
1032 }
1033
1034 #[test]
1035 fn test_stop() {
1036 let (_, events_url, _, _, _) = get_network_constants();
1038 let watcher = Watcher::new(events_url, None);
1039 assert!(*watcher.active.lock().unwrap());
1040
1041 watcher.stop();
1043
1044 assert!(!(*watcher.active.lock().unwrap()));
1046 }
1047
1048 #[test]
1049 fn test_subscribe() {
1050 let (_, events_url, _, _, _) = get_network_constants();
1052 let mut watcher = Watcher::new(events_url, None);
1053 let transaction_hash = "8c6823d9480eee9fe0cfb5ed1fbf77f928cc6af21121298c05b4e3d87a328271";
1054
1055 let subscription =
1057 Subscription::new(transaction_hash.to_string(), EventHandlerFn::default());
1058
1059 let result = watcher.subscribe(vec![subscription]);
1061
1062 assert!(result.is_ok());
1064
1065 let duplicate_subscription =
1067 Subscription::new(transaction_hash.to_string(), EventHandlerFn::default());
1068 let result_duplicate = watcher.subscribe(vec![duplicate_subscription]);
1069
1070 assert!(result_duplicate.is_err());
1072 assert_eq!(
1073 result_duplicate.err().unwrap(),
1074 "Already subscribed to this event"
1075 );
1076 }
1077
1078 #[test]
1079 fn test_unsubscribe() {
1080 let (_, events_url, _, _, _) = get_network_constants();
1082 let mut watcher = Watcher::new(events_url, None);
1083 let transaction_hash = "8c6823d9480eee9fe0cfb5ed1fbf77f928cc6af21121298c05b4e3d87a328271";
1084
1085 let transaction_hash_to_subscribe = transaction_hash.to_string();
1087 let subscription = Subscription::new(
1088 transaction_hash_to_subscribe.clone(),
1089 EventHandlerFn::default(),
1090 );
1091 let _ = watcher.subscribe(vec![subscription]);
1092
1093 assert!(watcher
1095 .subscriptions
1096 .iter()
1097 .any(|s| s.target_hash == transaction_hash_to_subscribe));
1098
1099 watcher.unsubscribe(transaction_hash_to_subscribe.clone());
1101
1102 assert!(!watcher
1104 .subscriptions
1105 .iter()
1106 .any(|s| s.target_hash == transaction_hash_to_subscribe));
1107 }
1108
1109 #[test]
1110 #[allow(deprecated)]
1111 fn test_sdk_watch_deploy_retunrs_instance() {
1112 let sdk = SDK::new(None, None, None);
1114 let (_, events_url, _, _, _) = get_network_constants();
1115 let timeout_duration = 5000;
1116
1117 let watcher = sdk.watch_deploy(&events_url, Some(timeout_duration));
1119
1120 assert_eq!(watcher.events_url, events_url);
1122 assert_eq!(watcher.subscriptions.len(), 0);
1123 assert!(*watcher.active.lock().unwrap());
1124 assert_eq!(
1125 watcher.timeout_duration,
1126 Duration::try_milliseconds(timeout_duration.try_into().unwrap()).unwrap()
1127 );
1128 }
1129
1130 #[test]
1131 fn test_sdk_watch_transaction_retunrs_instance() {
1132 let sdk = SDK::new(None, None, None);
1134 let (_, events_url, _, _, _) = get_network_constants();
1135 let timeout_duration = 5000;
1136
1137 let watcher = sdk.watch_transaction(&events_url, Some(timeout_duration));
1139
1140 assert_eq!(watcher.events_url, events_url);
1142 assert_eq!(watcher.subscriptions.len(), 0);
1143 assert!(*watcher.active.lock().unwrap());
1144 assert_eq!(
1145 watcher.timeout_duration,
1146 Duration::try_milliseconds(timeout_duration.try_into().unwrap()).unwrap()
1147 );
1148 }
1149
1150 #[tokio::test]
1151 #[allow(deprecated)]
1152 async fn test_wait_deploy_timeout() {
1153 let sdk = SDK::new(None, None, None);
1155 let (_, events_url, _, _, _) = get_network_constants();
1156 let deploy_hash = "19dbf9bdcd821e55392393c74c86deede02d9434d62d0bc72ab381ce7ea1c4f2";
1157 let timeout_duration = Some(5000);
1158
1159 let result = sdk
1161 .wait_deploy(&events_url, deploy_hash, timeout_duration)
1162 .await;
1163
1164 assert!(result.is_ok());
1166 let event_parse_result = result.unwrap();
1167 assert!(event_parse_result.err.is_some());
1168 assert_eq!(event_parse_result.err, Some("Timeout expired".to_string()));
1169 }
1170
1171 #[tokio::test]
1172 async fn test_wait_transaction_timeout() {
1173 let sdk = SDK::new(None, None, None);
1175 let (_, events_url, _, _, _) = get_network_constants();
1176 let transaction_hash = "8c6823d9480eee9fe0cfb5ed1fbf77f928cc6af21121298c05b4e3d87a328271";
1177 let timeout_duration = Some(5000);
1178
1179 let result = sdk
1181 .wait_transaction(&events_url, transaction_hash, timeout_duration)
1182 .await;
1183
1184 assert!(result.is_ok());
1186 let event_parse_result = result.unwrap();
1187 assert!(event_parse_result.err.is_some());
1188 assert_eq!(event_parse_result.err, Some("Timeout expired".to_string()));
1189 }
1190}