Skip to main content

haste_wal_worker/
es_search_destination.rs

1use etl::destination::Destination;
2use etl::error::EtlResult;
3use etl::types::{Cell, Event, TableId, TableRow};
4use haste_fhir_model::r4::generated::resources::ResourceType;
5use haste_fhir_search::{IndexResource, SearchEngine};
6use haste_jwt::{ProjectId, ResourceId, TenantId, VersionId};
7use haste_repository::types::{FHIRMethod, SupportedFHIRVersions};
8use tracing::{info, warn};
9
10// Important
11// ETL does not support generated columns so id,resource_type,version_id are not avaiable as they are automatically extracted from the resource jsonb column.
12// 0  id             | text                     |           | not null | generated always as (resource ->> 'id'::text) stored
13// 3  resource_type  | text                     |           | not null | generated always as (resource ->> 'resourceType'::text) stored
14// 11 version_id     | text                     |           | not null | generated always as ((resource -> 'meta'::text) ->> 'versionId'::text) stored
15
16// Column Order is as follows (defined by schema)
17// 0  tenant         | text                     |           | not null |
18// 1  project        | text                     |           | not null |
19// 2  author_id      | text                     |           | not null |
20// 3  resource       | jsonb                    |           | not null |
21// 4  deleted        | boolean                  |           | not null | false
22// 5  created_at     | timestamp with time zone |           | not null | now()
23// 6  request_method | character varying(7)     |           |          | 'PUT'::character varying
24// 7  fhir_version   | fhir_version             |           | not null |
25// 8  author_type    | text                     |           | not null |
26// 9  fhir_method    | fhir_method              |           | not null |
27// 10  sequence      | bigint                   |           | not null | nextval('resources_sequence_seq'::regclass)
28
29#[derive(Debug, Clone)]
30pub struct ESSearchDestination<Search: SearchEngine> {
31    search_client: Search,
32}
33
34impl<Search: SearchEngine> ESSearchDestination<Search> {
35    pub fn new(search_client: Search) -> EtlResult<Self> {
36        Ok(Self { search_client })
37    }
38}
39
40impl<Search: SearchEngine> Destination for ESSearchDestination<Search> {
41    fn name() -> &'static str {
42        "http"
43    }
44
45    async fn truncate_table(&self, _table_id: TableId) -> EtlResult<()> {
46        warn!(
47            "truncate_table is not implemented for ESSearchDestination as it is not intended to be used for writing table rows directly. Received table_id: {:?}",
48            _table_id
49        );
50        Ok(())
51    }
52
53    async fn write_table_rows(&self, _table_id: TableId, _rows: Vec<TableRow>) -> EtlResult<()> {
54        warn!(
55            "write_table_rows is not implemented for ESSearchDestination as it is not intended to be used for writing table rows directly. Received table_id: {:?} and rows: {:?}",
56            _table_id, _rows
57        );
58        Ok(())
59    }
60
61    async fn write_events(&self, events: Vec<Event>) -> EtlResult<()> {
62        if events.is_empty() {
63            return Ok(());
64        }
65        info!("Writing {} events", events.len());
66
67        let indexed_resources = events
68            .into_iter()
69            .filter_map(|e| {
70                if let Event::Insert(i) = e {
71                    Some(i.table_row.values)
72                } else {
73                    None
74                }
75            })
76            .map(|mut i| {
77                let mut tenant = Cell::Null;
78                let mut project = Cell::Null;
79                let mut resource = Cell::Null;
80                let mut fhir_method = Cell::Null;
81
82                std::mem::swap(&mut tenant, &mut i[0]);
83                std::mem::swap(&mut project, &mut i[1]);
84                std::mem::swap(&mut resource, &mut i[3]);
85                std::mem::swap(&mut fhir_method, &mut i[9]);
86
87                let tenant = match tenant {
88                    Cell::String(tenant) => TenantId::new(tenant),
89                    _ => {
90                        panic!("Unexpected cell type for tenant: {:?}", i[0]);
91                    }
92                };
93                let project = match project {
94                    Cell::String(project) => ProjectId::new(project),
95                    _ => {
96                        panic!("Unexpected cell type for project: {:?}", i[1]);
97                    }
98                };
99                let resource_json = match resource {
100                    Cell::Json(json) => json,
101                    _ => {
102                        panic!("Unexpected cell type for resource: {:?}", i[5]);
103                    }
104                };
105                // account for the 3 popped values
106                let fhir_method = match fhir_method {
107                    Cell::String(fhir_method) => {
108                        FHIRMethod::try_from(fhir_method.as_str()).unwrap()
109                    }
110                    _ => {
111                        panic!("Unexpected cell type for fhir_method: {:?}", i[10 - 3]);
112                    }
113                };
114
115                let id = resource_json
116                    .get("id")
117                    .and_then(|js| js.as_str().map(|s| ResourceId::new(s.to_string())));
118                let version_id = resource_json
119                    .get("meta")
120                    .and_then(|meta| meta.get("versionId"))
121                    .and_then(|version| version.as_str().map(|s| VersionId::new(s.to_string())));
122                let resource_type = resource_json
123                    .get("resourceType")
124                    .and_then(|js| js.as_str().map(|s| ResourceType::try_from(s).unwrap()));
125
126                IndexResource {
127                    id: id.expect("Failed to extract id"),
128                    version_id: version_id.expect("Failed to extract version_id"),
129                    tenant,
130                    project,
131                    fhir_method,
132                    resource_type: resource_type.expect("Failed to extract resource_type"),
133                    resource: haste_fhir_serialization_json::from_serde_value(resource_json)
134                        .unwrap(),
135                }
136            })
137            .collect::<Vec<_>>();
138
139        self.search_client
140            .index(SupportedFHIRVersions::R4, indexed_resources)
141            .await
142            .expect("Failed to index resources in search engine");
143
144        Ok(())
145    }
146}