1use crate::SDK;
2use chrono::{Duration, Utc};
3use futures_util::StreamExt;
4#[cfg(target_arch = "wasm32")]
5use gloo_utils::format::JsValueSerdeExt;
6#[cfg(target_arch = "wasm32")]
7use js_sys::Promise;
8use serde::{Deserialize, Deserializer, Serialize};
9use serde_json::Value;
10use std::{
11 fmt,
12 sync::{Arc, Mutex},
13};
14use wasm_bindgen::prelude::*;
15#[cfg(target_arch = "wasm32")]
16use wasm_bindgen_futures::future_to_promise;
17
18const DEFAULT_TIMEOUT_MS: u64 = 60000;
19
20impl SDK {
21 #[deprecated(note = "prefer 'watch_transaction'")]
33 pub fn watch_deploy(&self, events_url: &str, timeout_duration: Option<u64>) -> Watcher {
34 Watcher::new(events_url.to_string(), timeout_duration)
35 }
36
37 pub fn watch_transaction(&self, events_url: &str, timeout_duration: Option<u64>) -> Watcher {
48 Watcher::new(events_url.to_string(), timeout_duration)
49 }
50
51 #[deprecated(note = "prefer 'wait_transaction' with transaction")]
64 pub async fn wait_deploy(
65 &self,
66 events_url: &str,
67 deploy_hash: &str,
68 timeout_duration: Option<u64>,
69 ) -> Result<EventParseResult, String> {
70 Self::wait_transaction_internal(
71 events_url.to_string(),
72 deploy_hash.to_string(),
73 timeout_duration,
74 )
75 .await
76 }
77
78 pub async fn wait_transaction(
90 &self,
91 events_url: &str,
92 target_hash: &str,
93 timeout_duration: Option<u64>,
94 ) -> Result<EventParseResult, String> {
95 Self::wait_transaction_internal(
96 events_url.to_string(),
97 target_hash.to_string(),
98 timeout_duration,
99 )
100 .await
101 }
102
103 async fn wait_transaction_internal(
115 events_url: String,
116 target_hash: String,
117 timeout_duration: Option<u64>,
118 ) -> Result<EventParseResult, String> {
119 let watcher = Watcher::new(events_url, timeout_duration);
120 let result = watcher.start_internal(Some(target_hash)).await;
121 match result {
122 Some(event_parse_results) => {
123 if let Some(event_parse_result) = event_parse_results.first() {
124 return Ok(event_parse_result.clone());
125 }
126 Err("No first event result".to_string())
127 }
128 None => Err("No event result found".to_string()),
129 }
130 }
131}
132
133#[wasm_bindgen]
134impl SDK {
135 #[cfg(target_arch = "wasm32")]
147 #[wasm_bindgen(js_name = "watchDeploy")]
148 #[deprecated(note = "prefer 'watchTransaction'")]
149 #[allow(deprecated)]
150 pub fn watch_deploy_js_alias(
151 &self,
152 events_url: &str,
153 timeout_duration: Option<u32>,
154 ) -> Watcher {
155 self.watch_deploy(events_url, timeout_duration.map(Into::into))
156 }
157
158 #[cfg(target_arch = "wasm32")]
169 #[wasm_bindgen(js_name = "watchTransaction")]
170 pub fn watch_transaction_js_alias(
171 &self,
172 events_url: &str,
173 timeout_duration: Option<u32>,
174 ) -> Watcher {
175 self.watch_transaction(events_url, timeout_duration.map(Into::into))
176 }
177
178 #[cfg(target_arch = "wasm32")]
191 #[wasm_bindgen(js_name = "waitDeploy")]
192 #[deprecated(note = "prefer 'waitTransaction' with transaction")]
193 #[allow(deprecated)]
194 pub async fn wait_deploy_js_alias(
195 &self,
196 events_url: &str,
197 deploy_hash: &str,
198 timeout_duration: Option<u32>,
199 ) -> Promise {
200 self.wait_transaction_js_alias(events_url, deploy_hash, timeout_duration)
201 .await
202 }
203
204 #[cfg(target_arch = "wasm32")]
216 #[wasm_bindgen(js_name = "waitTransaction")]
217 pub async fn wait_transaction_js_alias(
218 &self,
219 events_url: &str,
220 target_hash: &str,
221 timeout_duration: Option<u32>,
222 ) -> Promise {
223 let events_url = events_url.to_string();
224 let target_hash = target_hash.to_string();
225 let future = async move {
226 let result = Self::wait_transaction_internal(
227 events_url,
228 target_hash,
229 timeout_duration.map(Into::into),
230 )
231 .await;
232 match result {
233 Ok(event_parse_result) => JsValue::from_serde(&event_parse_result)
234 .map_err(|err| JsValue::from_str(&format!("{err}"))),
235 Err(err) => Err(JsValue::from_str(&err)),
236 }
237 };
238
239 future_to_promise(future)
240 }
241}
242
243#[derive(Clone)]
255#[wasm_bindgen]
256pub struct Watcher {
257 events_url: String,
258 subscriptions: Vec<Subscription>,
259 active: Arc<Mutex<bool>>,
260 timeout_duration: Duration,
261}
262
263#[wasm_bindgen]
264impl Watcher {
265 #[wasm_bindgen(constructor)]
277 pub fn new(events_url: String, timeout_duration: Option<u64>) -> Self {
278 let timeout_duration = Duration::try_milliseconds(
279 timeout_duration
280 .unwrap_or(DEFAULT_TIMEOUT_MS)
281 .try_into()
282 .unwrap(),
283 )
284 .unwrap_or_default();
285
286 Watcher {
287 events_url,
288 subscriptions: Vec::new(),
289 active: Arc::new(Mutex::new(true)),
290 timeout_duration,
291 }
292 }
293
294 #[cfg(target_arch = "wasm32")]
304 #[wasm_bindgen(js_name = "subscribe")]
305 pub fn subscribe_js_alias(&mut self, subscriptions: Vec<Subscription>) -> Result<(), String> {
306 self.subscribe(subscriptions)
307 }
308
309 #[wasm_bindgen]
317 pub fn unsubscribe(&mut self, target_hash: String) {
318 self.subscriptions.retain(|s| s.target_hash != target_hash);
319 }
320
321 #[cfg(target_arch = "wasm32")]
327 #[wasm_bindgen(js_name = "start")]
328 pub async fn start_js_alias(&self) -> Result<JsValue, JsError> {
329 let result = match self.start_internal(None).await {
330 Some(res) => res,
331 None => return Ok(JsValue::NULL),
332 };
333
334 let serialized = JsValue::from_serde(&result)
335 .map_err(|err| JsError::new(&format!("Error serializing events: {err:?}")))?;
336
337 Ok(serialized)
338 }
339
340 #[wasm_bindgen]
344 pub fn stop(&self) {
345 {
346 let mut active = self.active.lock().unwrap();
347 *active = false;
348 }
349 }
350}
351
352impl Watcher {
353 pub async fn start(&self) -> Option<Vec<EventParseResult>> {
359 self.start_internal(None).await
360 }
361
362 async fn start_internal(&self, target_hash: Option<String>) -> Option<Vec<EventParseResult>> {
372 {
373 let mut active = self.active.lock().unwrap();
374 *active = true;
375 }
376
377 let client = reqwest::Client::new();
378 let url = self.events_url.clone();
379
380 #[allow(clippy::arc_with_non_send_sync)]
383 let watcher = Arc::new(Mutex::new(self.clone()));
384
385 let start_time = Utc::now();
386 let timeout_duration = self.timeout_duration;
387
388 let response = match client.get(&url).send().await {
389 Ok(res) => res,
390 Err(err) => {
391 let err = err.to_string();
392 let event_parse_result = EventParseResult {
393 err: Some(err.to_string()),
394 body: None,
395 };
396 return Some([event_parse_result].to_vec());
397 }
398 };
399
400 if response.status().is_success() {
401 let buffer_size = 1;
402 let mut buffer = Vec::with_capacity(buffer_size);
403
404 let mut bytes_stream = response.bytes_stream();
405 while let Some(chunk) = bytes_stream.next().await {
406 match chunk {
407 Ok(bytes) => {
408 let this_clone = Arc::clone(&watcher);
409 if !*this_clone.lock().unwrap().active.lock().unwrap() {
410 return None;
411 }
412
413 if Utc::now() - start_time >= timeout_duration {
414 let event_parse_result = EventParseResult {
415 err: Some("Timeout expired".to_string()),
416 body: None,
417 };
418 return Some([event_parse_result].to_vec());
419 }
420
421 buffer.extend_from_slice(&bytes);
422
423 while let Some(index) = buffer.iter().position(|&b| b == b'\n') {
424 let message = buffer.drain(..=index).collect::<Vec<_>>();
425
426 if let Ok(message) = std::str::from_utf8(&message) {
427 let watcher_guard = this_clone.lock().unwrap();
428 let result = watcher_guard
429 .clone()
430 .process_events(message, target_hash.as_deref());
431 match result {
432 Some(event_parse_result) => return Some(event_parse_result),
433 None => {
434 continue;
435 }
436 };
437 } else {
438 let event_parse_result = EventParseResult {
439 err: Some("Error decoding UTF-8 data".to_string()),
440 body: None,
441 };
442 return Some([event_parse_result].to_vec());
443 }
444 }
445 }
446 Err(err) => {
447 let event_parse_result = EventParseResult {
448 err: Some(format!("Error reading chunk: {err}")),
449 body: None,
450 };
451 return Some([event_parse_result].to_vec());
452 }
453 }
454 }
455 } else {
456 let event_parse_result = EventParseResult {
457 err: Some("Failed to fetch stream".to_string()),
458 body: None,
459 };
460 return Some([event_parse_result].to_vec());
461 }
462 None
463 }
464
465 pub fn subscribe(&mut self, subscriptions: Vec<Subscription>) -> Result<(), String> {
475 for new_subscription in &subscriptions {
476 if self
477 .subscriptions
478 .iter()
479 .any(|s| s.target_hash == new_subscription.target_hash)
480 {
481 return Err(String::from("Already subscribed to this event"));
482 }
483 }
484 self.subscriptions.extend(subscriptions);
485 Ok(())
486 }
487
488 fn process_events(
499 mut self,
500 message: &str,
501 target_hash: Option<&str>,
502 ) -> Option<Vec<EventParseResult>> {
503 let data_stream = Self::extract_data_stream(message);
504
505 for data_item in data_stream {
506 let trimmed_item = data_item.trim();
507 let transaction_processed_str = EventName::TransactionProcessed.to_string();
508
509 if !trimmed_item.contains(&transaction_processed_str) {
510 continue;
511 }
512
513 if let Ok(parsed_json) = serde_json::from_str::<Value>(trimmed_item) {
514 let transaction = parsed_json.get(transaction_processed_str);
515 if let Some(transaction_processed) =
516 transaction.and_then(|transaction| transaction.as_object())
517 {
518 if let Some(transaction_hash_processed) = transaction_processed
519 .get("transaction_hash")
520 .and_then(|transaction_hash| {
521 transaction_hash
522 .get("Version1")
523 .or_else(|| transaction_hash.get("Deploy"))
524 .and_then(|transaction_hash| transaction_hash.as_str())
525 })
526 {
527 let mut transaction_hash_found =
528 target_hash == Some(transaction_hash_processed);
529
530 let transaction_processed: Option<TransactionProcessed> =
531 serde_json::from_value(transaction.unwrap().clone()).ok();
532
533 let body = Some(Body {
534 transaction_processed,
535 });
536
537 let event_parse_result = EventParseResult { err: None, body };
538
539 if transaction_hash_found {
540 self.unsubscribe(target_hash.unwrap().to_string());
541 self.stop();
542 return Some([event_parse_result].to_vec());
543 }
544
545 let mut results: Vec<EventParseResult> = [].to_vec();
546 for subscription in self.subscriptions.clone().iter() {
547 if transaction_hash_processed == subscription.target_hash {
548 let event_handler = &subscription.event_handler_fn;
549
550 #[cfg(not(target_arch = "wasm32"))]
551 {
552 event_handler.call(event_parse_result.clone());
553 }
554 #[cfg(target_arch = "wasm32")]
555 {
556 let this = JsValue::null();
557 let args = js_sys::Array::new();
558 args.push(
559 &JsValue::from_serde(&event_parse_result.clone()).unwrap(),
560 );
561 event_handler.apply(&this, &args).unwrap();
562 }
563
564 self.unsubscribe(transaction_hash_processed.to_string());
565 transaction_hash_found = true;
566 results.push(event_parse_result.clone())
567 }
568 }
569
570 if transaction_hash_found && self.subscriptions.is_empty() {
571 self.stop();
572 return Some(results);
573 }
574 }
575 }
576 } else {
577 let event_parse_result = EventParseResult {
578 err: Some("Failed to parse JSON data.".to_string()),
579 body: None,
580 };
581 return Some([event_parse_result].to_vec());
582 }
583 }
584 None
585 }
586
587 fn extract_data_stream(json_data: &str) -> Vec<&str> {
597 let data_stream: Vec<&str> = json_data
598 .split("data:")
599 .filter(|s| !s.is_empty())
600 .map(|s| s.split("id:").next().unwrap_or(""))
601 .collect();
602 data_stream
603 }
604}
605
606pub struct EventHandlerFn(Arc<Mutex<dyn Fn(EventParseResult) + Send + Sync>>);
608
609#[allow(dead_code)]
610impl EventHandlerFn {
611 pub fn new<F>(func: F) -> Self
621 where
622 F: Fn(EventParseResult) + Send + Sync + 'static,
623 {
624 EventHandlerFn(Arc::new(Mutex::new(func)))
625 }
626
627 pub fn call(&self, event_result: EventParseResult) {
633 let func = self.0.lock().unwrap();
634 (*func)(event_result); }
636}
637
638impl fmt::Debug for EventHandlerFn {
639 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
641 write!(f, "EventHandlerFn")
642 }
643}
644
645impl Clone for EventHandlerFn {
646 fn clone(&self) -> Self {
648 EventHandlerFn(self.0.clone())
649 }
650}
651
652impl Default for EventHandlerFn {
653 fn default() -> Self {
655 EventHandlerFn(Arc::new(Mutex::new(|_event_result| {})))
656 }
657}
658
659#[cfg(not(target_arch = "wasm32"))]
661#[derive(Debug, Clone, Default)]
663pub struct Subscription {
664 pub target_hash: String,
666 pub event_handler_fn: EventHandlerFn,
668}
669
670#[cfg(target_arch = "wasm32")]
671#[derive(Debug, Clone, Default)]
673#[wasm_bindgen(getter_with_clone)]
674pub struct Subscription {
675 #[wasm_bindgen(js_name = "targetHash")]
677 pub target_hash: String,
678 #[wasm_bindgen(js_name = "eventHandlerFn")]
680 pub event_handler_fn: js_sys::Function,
681}
682
683impl Subscription {
684 #[cfg(not(target_arch = "wasm32"))]
691 pub fn new(target_hash: String, event_handler_fn: EventHandlerFn) -> Self {
692 Self {
693 target_hash,
694 event_handler_fn,
695 }
696 }
697}
698
699#[wasm_bindgen]
700impl Subscription {
701 #[cfg(target_arch = "wasm32")]
708 #[wasm_bindgen(constructor)]
709 pub fn new(target_hash: String, event_handler_fn: js_sys::Function) -> Self {
710 Self {
711 target_hash,
712 event_handler_fn,
713 }
714 }
715}
716
717#[derive(Debug, Deserialize, Clone, Default, Serialize)]
719#[wasm_bindgen(getter_with_clone)]
720pub struct Failure {
721 pub cost: String,
722 pub error_message: String,
723}
724
725#[derive(Debug, Deserialize, Clone, Default, Serialize)]
727#[wasm_bindgen(getter_with_clone)]
728pub struct Version2 {
729 pub initiator: PublicKeyString,
730 pub error_message: Option<String>,
731 pub limit: String,
732 pub consumed: String,
733 pub cost: String,
734 }
736
737#[derive(Debug, Deserialize, Clone, Default, Serialize)]
738#[wasm_bindgen(getter_with_clone)]
739pub struct Payment {
740 pub source: String,
741}
742
743#[derive(Debug, Deserialize, Clone, Default, Serialize)]
745#[wasm_bindgen(getter_with_clone)]
746pub struct ExecutionResult {
747 #[serde(rename = "Version2")]
749 #[wasm_bindgen(js_name = "Success")]
750 pub success: Option<Version2>,
751 #[serde(rename = "Failure")]
753 #[wasm_bindgen(js_name = "Failure")]
754 pub failure: Option<Failure>,
755}
756
757#[derive(Debug, Clone, Serialize, Default)]
758#[wasm_bindgen(getter_with_clone)]
759pub struct HashString {
760 pub hash: String,
761}
762
763impl HashString {
764 fn from_hash(hash: String) -> Self {
765 HashString { hash }
766 }
767}
768
769impl<'de> Deserialize<'de> for HashString {
770 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
771 where
772 D: Deserializer<'de>,
773 {
774 let map: std::collections::HashMap<String, String> =
775 Deserialize::deserialize(deserializer)?;
776
777 if let Some(hash) = map.get("Version1").or_else(|| map.get("Deploy")) {
778 Ok(HashString::from_hash(hash.clone()))
779 } else {
780 Err(serde::de::Error::missing_field("Deploy or Version1"))
781 }
782 }
783}
784
785#[wasm_bindgen]
786impl HashString {
787 #[wasm_bindgen(getter, js_name = "Deploy")]
788 pub fn deploy(&self) -> String {
789 self.hash.clone()
790 }
791
792 #[wasm_bindgen(getter, js_name = "Version1")]
793 pub fn version1(&self) -> String {
794 self.hash.clone()
795 }
796
797 #[wasm_bindgen(js_name = "toString")]
798 pub fn to_string_js(&self) -> String {
799 self.to_string()
800 }
801}
802
803impl fmt::Display for HashString {
804 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
805 write!(f, "{}", self.hash)
806 }
807}
808
809#[derive(Debug, Deserialize, Clone, Serialize, Default)]
810#[wasm_bindgen(getter_with_clone)]
811pub struct PublicKeyString {
812 #[serde(rename = "PublicKey")]
813 #[wasm_bindgen(js_name = "PublicKey")]
814 pub public_key: String,
815}
816
817#[derive(Debug, Deserialize, Clone, Serialize, Default)]
818#[wasm_bindgen(getter_with_clone)]
819pub struct Message {
820 #[serde(rename = "String")]
821 #[wasm_bindgen(js_name = "String")]
822 pub string: String,
823}
824
825#[derive(Debug, Deserialize, Clone, Serialize, Default)]
826#[wasm_bindgen(getter_with_clone)]
827pub struct Messages {
828 pub entity_hash: String,
829 pub message: Message,
830 pub topic_name: String,
831 pub topic_name_hash: String,
832 pub topic_index: u32,
833 pub block_index: u64,
834}
835
836#[derive(Debug, Deserialize, Clone, Default, Serialize)]
838#[wasm_bindgen(getter_with_clone)]
839pub struct TransactionProcessed {
840 #[serde(alias = "transaction_hash")]
841 pub hash: HashString,
842 pub initiator_addr: PublicKeyString,
843 pub timestamp: String,
844 pub ttl: String,
845 pub block_hash: String,
846 pub execution_result: ExecutionResult,
848 pub messages: Vec<Messages>,
849}
850
851#[derive(Debug, Deserialize, Clone, Default, Serialize)]
853#[wasm_bindgen(getter_with_clone)]
854pub struct Body {
855 #[serde(rename = "TransactionProcessed")]
856 pub transaction_processed: Option<TransactionProcessed>,
857}
858
859#[wasm_bindgen]
861impl Body {
862 #[wasm_bindgen(getter, js_name = "get_deploy_processed")]
863 #[deprecated(note = "prefer 'get_transaction_processed'")]
864 #[allow(deprecated)]
865 pub fn get_deploy_processed(&self) -> Option<TransactionProcessed> {
866 self.transaction_processed.clone()
867 }
868
869 #[wasm_bindgen(getter, js_name = "get_transaction_processed")]
870 pub fn get_transaction_processed(&self) -> Option<TransactionProcessed> {
871 self.transaction_processed.clone()
872 }
873}
874
875#[derive(Debug, Deserialize, Clone, Default, Serialize)]
877#[wasm_bindgen(getter_with_clone)]
878pub struct EventParseResult {
879 pub err: Option<String>,
880 pub body: Option<Body>,
881}
882
883#[derive(Debug, Deserialize, Clone, Serialize)]
885enum EventName {
886 BlockAdded,
887 TransactionAccepted,
888 TransactionExpired,
889 TransactionProcessed,
890 Step,
891 FinalitySignature,
892 Fault,
893}
894
895impl fmt::Display for EventName {
896 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
898 match self {
899 EventName::BlockAdded => write!(f, "BlockAdded"),
900 EventName::TransactionAccepted => write!(f, "TransactionAccepted"),
901 EventName::TransactionExpired => write!(f, "TransactionExpired"),
902 EventName::TransactionProcessed => write!(f, "TransactionProcessed"),
903 EventName::Step => write!(f, "Step"),
904 EventName::FinalitySignature => write!(f, "FinalitySignature"),
905 EventName::Fault => write!(f, "Fault"),
906 }
907 }
908}
909
910#[cfg(test)]
911mod tests {
912 use super::*;
913 use crate::watcher::{deploy_mock::DEPLOY_MOCK, transaction_mock::TRANSACTION_MOCK};
914 use sdk_tests::tests::helpers::get_network_constants;
915 use tokio;
916
917 #[test]
918 fn test_new() {
919 let (_, events_url, _, _, _) = get_network_constants();
921 let timeout_duration = 5000;
922
923 let watcher = Watcher::new(events_url.clone(), Some(timeout_duration));
925
926 assert_eq!(watcher.events_url, events_url);
928 assert_eq!(watcher.subscriptions.len(), 0);
929 assert!(*watcher.active.lock().unwrap());
930 assert_eq!(
931 watcher.timeout_duration,
932 Duration::try_milliseconds(timeout_duration.try_into().unwrap()).unwrap()
933 );
934 }
935
936 #[test]
937 fn test_new_default_timeout() {
938 let (_, events_url, _, _, _) = get_network_constants();
940
941 let watcher = Watcher::new(events_url.clone(), None);
943
944 assert_eq!(watcher.events_url, events_url);
946 assert_eq!(watcher.subscriptions.len(), 0);
947 assert!(*watcher.active.lock().unwrap());
948 assert_eq!(
949 watcher.timeout_duration,
950 Duration::try_milliseconds(DEFAULT_TIMEOUT_MS.try_into().unwrap()).unwrap()
951 );
952 }
953
954 #[tokio::test]
955 async fn test_extract_data_stream() {
956 let json_data = r#"data:segment1id:data:segment2id:data:segment3id:"#;
958
959 let result = Watcher::extract_data_stream(json_data);
961
962 assert_eq!(result, vec!["segment1", "segment2", "segment3"]);
964 }
965
966 #[tokio::test]
967 #[allow(deprecated)]
968 async fn test_process_events_legacy() {
969 let (_, events_url, _, _, _) = get_network_constants();
971 let watcher = Watcher::new(events_url, None);
972 let deploy_hash = "19dbf9bdcd821e55392393c74c86deede02d9434d62d0bc72ab381ce7ea1c4f2";
973
974 let target_deploy_hash = Some(deploy_hash);
975
976 let result = watcher.process_events(DEPLOY_MOCK, target_deploy_hash);
978
979 assert!(result.is_some());
981 let results = result.unwrap();
982 assert_eq!(results.len(), 1);
983
984 let event_parse_result = &results[0];
985 assert!(event_parse_result.err.is_none());
986
987 let body = event_parse_result.body.as_ref().unwrap();
988 let get_deploy_processed = body.get_deploy_processed().unwrap();
989 assert_eq!(get_deploy_processed.hash.to_string(), deploy_hash);
990 }
991
992 #[tokio::test]
993 async fn test_process_events() {
994 let (_, events_url, _, _, _) = get_network_constants();
996 let watcher = Watcher::new(events_url, None);
997 let transaction_hash = "8c6823d9480eee9fe0cfb5ed1fbf77f928cc6af21121298c05b4e3d87a328271";
998
999 let target_transaction_hash = Some(transaction_hash);
1000
1001 let result = watcher.process_events(TRANSACTION_MOCK, target_transaction_hash);
1003
1004 assert!(result.is_some());
1006 let results = result.unwrap();
1007 assert_eq!(results.len(), 1);
1008
1009 let event_parse_result = &results[0];
1010 assert!(event_parse_result.err.is_none());
1011
1012 let body = event_parse_result.body.as_ref().unwrap();
1013 let transaction_processed = body.get_transaction_processed().unwrap();
1014 assert_eq!(transaction_processed.hash.to_string(), transaction_hash);
1015 }
1016
1017 #[tokio::test]
1018 async fn test_start_timeout() {
1019 let (_, events_url, _, _, _) = get_network_constants();
1021 let watcher = Watcher::new(events_url, Some(1));
1022
1023 let result = watcher.start().await;
1025
1026 assert!(result.is_some());
1028 let results = result.unwrap();
1029 assert_eq!(results.len(), 1);
1030 assert_eq!(results[0].err, Some("Timeout expired".to_string()));
1031 assert!(results[0].body.is_none());
1032 }
1033
1034 #[test]
1035 fn test_stop() {
1036 let (_, events_url, _, _, _) = get_network_constants();
1038 let watcher = Watcher::new(events_url, None);
1039 assert!(*watcher.active.lock().unwrap());
1040
1041 watcher.stop();
1043
1044 assert!(!(*watcher.active.lock().unwrap()));
1046 }
1047
1048 #[test]
1049 fn test_subscribe() {
1050 let (_, events_url, _, _, _) = get_network_constants();
1052 let mut watcher = Watcher::new(events_url, None);
1053 let transaction_hash = "8c6823d9480eee9fe0cfb5ed1fbf77f928cc6af21121298c05b4e3d87a328271";
1054
1055 let subscription =
1057 Subscription::new(transaction_hash.to_string(), EventHandlerFn::default());
1058
1059 let result = watcher.subscribe(vec![subscription]);
1061
1062 assert!(result.is_ok());
1064
1065 let duplicate_subscription =
1067 Subscription::new(transaction_hash.to_string(), EventHandlerFn::default());
1068 let result_duplicate = watcher.subscribe(vec![duplicate_subscription]);
1069
1070 assert!(result_duplicate.is_err());
1072 assert_eq!(
1073 result_duplicate.err().unwrap(),
1074 "Already subscribed to this event"
1075 );
1076 }
1077
1078 #[test]
1079 fn test_unsubscribe() {
1080 let (_, events_url, _, _, _) = get_network_constants();
1082 let mut watcher = Watcher::new(events_url, None);
1083 let transaction_hash = "8c6823d9480eee9fe0cfb5ed1fbf77f928cc6af21121298c05b4e3d87a328271";
1084
1085 let transaction_hash_to_subscribe = transaction_hash.to_string();
1087 let subscription = Subscription::new(
1088 transaction_hash_to_subscribe.clone(),
1089 EventHandlerFn::default(),
1090 );
1091 let _ = watcher.subscribe(vec![subscription]);
1092
1093 assert!(watcher
1095 .subscriptions
1096 .iter()
1097 .any(|s| s.target_hash == transaction_hash_to_subscribe));
1098
1099 watcher.unsubscribe(transaction_hash_to_subscribe.clone());
1101
1102 assert!(!watcher
1104 .subscriptions
1105 .iter()
1106 .any(|s| s.target_hash == transaction_hash_to_subscribe));
1107 }
1108
1109 #[test]
1110 #[allow(deprecated)]
1111 fn test_sdk_watch_deploy_retunrs_instance() {
1112 let sdk = SDK::new(None, None, None);
1114 let (_, events_url, _, _, _) = get_network_constants();
1115 let timeout_duration = 5000;
1116
1117 let watcher = sdk.watch_deploy(&events_url, Some(timeout_duration));
1119
1120 assert_eq!(watcher.events_url, events_url);
1122 assert_eq!(watcher.subscriptions.len(), 0);
1123 assert!(*watcher.active.lock().unwrap());
1124 assert_eq!(
1125 watcher.timeout_duration,
1126 Duration::try_milliseconds(timeout_duration.try_into().unwrap()).unwrap()
1127 );
1128 }
1129
1130 #[test]
1131 fn test_sdk_watch_transaction_retunrs_instance() {
1132 let sdk = SDK::new(None, None, None);
1134 let (_, events_url, _, _, _) = get_network_constants();
1135 let timeout_duration = 5000;
1136
1137 let watcher = sdk.watch_transaction(&events_url, Some(timeout_duration));
1139
1140 assert_eq!(watcher.events_url, events_url);
1142 assert_eq!(watcher.subscriptions.len(), 0);
1143 assert!(*watcher.active.lock().unwrap());
1144 assert_eq!(
1145 watcher.timeout_duration,
1146 Duration::try_milliseconds(timeout_duration.try_into().unwrap()).unwrap()
1147 );
1148 }
1149
1150 #[tokio::test]
1151 #[allow(deprecated)]
1152 async fn test_wait_deploy_timeout() {
1153 let sdk = SDK::new(None, None, None);
1155 let (_, events_url, _, _, _) = get_network_constants();
1156 let deploy_hash = "19dbf9bdcd821e55392393c74c86deede02d9434d62d0bc72ab381ce7ea1c4f2";
1157 let timeout_duration = Some(5000);
1158
1159 let result = sdk
1161 .wait_deploy(&events_url, deploy_hash, timeout_duration)
1162 .await;
1163
1164 assert!(result.is_ok());
1166 let event_parse_result = result.unwrap();
1167 assert!(event_parse_result.err.is_some());
1168 assert_eq!(event_parse_result.err, Some("Timeout expired".to_string()));
1169 }
1170
1171 #[tokio::test]
1172 async fn test_wait_transaction_timeout() {
1173 let sdk = SDK::new(None, None, None);
1175 let (_, events_url, _, _, _) = get_network_constants();
1176 let transaction_hash = "8c6823d9480eee9fe0cfb5ed1fbf77f928cc6af21121298c05b4e3d87a328271";
1177 let timeout_duration = Some(5000);
1178
1179 let result = sdk
1181 .wait_transaction(&events_url, transaction_hash, timeout_duration)
1182 .await;
1183
1184 assert!(result.is_ok());
1186 let event_parse_result = result.unwrap();
1187 assert!(event_parse_result.err.is_some());
1188 assert_eq!(event_parse_result.err, Some("Timeout expired".to_string()));
1189 }
1190}