haste_wal_worker/
es_search_destination.rs1use etl::destination::Destination;
2use etl::error::EtlResult;
3use etl::types::{Cell, Event, TableId, TableRow};
4use haste_fhir_model::r4::generated::resources::ResourceType;
5use haste_fhir_search::{IndexResource, SearchEngine};
6use haste_jwt::{ProjectId, ResourceId, TenantId, VersionId};
7use haste_repository::types::{FHIRMethod, SupportedFHIRVersions};
8use tracing::{info, warn};
9
10#[derive(Debug, Clone)]
30pub struct ESSearchDestination<Search: SearchEngine> {
31 search_client: Search,
32}
33
34impl<Search: SearchEngine> ESSearchDestination<Search> {
35 pub fn new(search_client: Search) -> EtlResult<Self> {
36 Ok(Self { search_client })
37 }
38}
39
40impl<Search: SearchEngine> Destination for ESSearchDestination<Search> {
41 fn name() -> &'static str {
42 "http"
43 }
44
45 async fn truncate_table(&self, _table_id: TableId) -> EtlResult<()> {
46 warn!(
47 "truncate_table is not implemented for ESSearchDestination as it is not intended to be used for writing table rows directly. Received table_id: {:?}",
48 _table_id
49 );
50 Ok(())
51 }
52
53 async fn write_table_rows(&self, _table_id: TableId, _rows: Vec<TableRow>) -> EtlResult<()> {
54 warn!(
55 "write_table_rows is not implemented for ESSearchDestination as it is not intended to be used for writing table rows directly. Received table_id: {:?} and rows: {:?}",
56 _table_id, _rows
57 );
58 Ok(())
59 }
60
61 async fn write_events(&self, events: Vec<Event>) -> EtlResult<()> {
62 if events.is_empty() {
63 return Ok(());
64 }
65 info!("Writing {} events", events.len());
66
67 let indexed_resources = events
68 .into_iter()
69 .filter_map(|e| {
70 if let Event::Insert(i) = e {
71 Some(i.table_row.values)
72 } else {
73 None
74 }
75 })
76 .map(|mut i| {
77 let mut tenant = Cell::Null;
78 let mut project = Cell::Null;
79 let mut resource = Cell::Null;
80 let mut fhir_method = Cell::Null;
81
82 std::mem::swap(&mut tenant, &mut i[0]);
83 std::mem::swap(&mut project, &mut i[1]);
84 std::mem::swap(&mut resource, &mut i[3]);
85 std::mem::swap(&mut fhir_method, &mut i[9]);
86
87 let tenant = match tenant {
88 Cell::String(tenant) => TenantId::new(tenant),
89 _ => {
90 panic!("Unexpected cell type for tenant: {:?}", i[0]);
91 }
92 };
93 let project = match project {
94 Cell::String(project) => ProjectId::new(project),
95 _ => {
96 panic!("Unexpected cell type for project: {:?}", i[1]);
97 }
98 };
99 let resource_json = match resource {
100 Cell::Json(json) => json,
101 _ => {
102 panic!("Unexpected cell type for resource: {:?}", i[5]);
103 }
104 };
105 let fhir_method = match fhir_method {
107 Cell::String(fhir_method) => {
108 FHIRMethod::try_from(fhir_method.as_str()).unwrap()
109 }
110 _ => {
111 panic!("Unexpected cell type for fhir_method: {:?}", i[10 - 3]);
112 }
113 };
114
115 let id = resource_json
116 .get("id")
117 .and_then(|js| js.as_str().map(|s| ResourceId::new(s.to_string())));
118 let version_id = resource_json
119 .get("meta")
120 .and_then(|meta| meta.get("versionId"))
121 .and_then(|version| version.as_str().map(|s| VersionId::new(s.to_string())));
122 let resource_type = resource_json
123 .get("resourceType")
124 .and_then(|js| js.as_str().map(|s| ResourceType::try_from(s).unwrap()));
125
126 IndexResource {
127 id: id.expect("Failed to extract id"),
128 version_id: version_id.expect("Failed to extract version_id"),
129 tenant,
130 project,
131 fhir_method,
132 resource_type: resource_type.expect("Failed to extract resource_type"),
133 resource: haste_fhir_serialization_json::from_serde_value(resource_json)
134 .unwrap(),
135 }
136 })
137 .collect::<Vec<_>>();
138
139 self.search_client
140 .index(SupportedFHIRVersions::R4, indexed_resources)
141 .await
142 .expect("Failed to index resources in search engine");
143
144 Ok(())
145 }
146}