haste_fhir_search/elastic_search/
search_parameter_resolver.rs1use elasticsearch::Elasticsearch;
2use haste_fhir_client::{
3 request::{FHIRSearchTypeRequest, SearchRequest},
4 url::ParsedParameters,
5};
6use haste_fhir_model::r4::generated::resources::{Resource, ResourceType};
7use haste_fhir_operation_error::OperationOutcomeError;
8use haste_jwt::{ProjectId, TenantId};
9use haste_repository::{Repository, fhir::CachePolicy};
10use moka::future::{Cache, CacheBuilder};
11use std::sync::{Arc, LazyLock};
12
13use crate::{
14 ResolvedParameter, SearchOptions, SearchParameterResolve,
15 elastic_search::search,
16 memory::{R4_SEARCH_PARAMETERS_INDEX, SearchParametersIndex, create_index_map},
17};
18
19#[derive(Clone)]
20pub struct ElasticSearchParameterResolver<Repo: Repository + Send + Sync> {
21 es: Arc<Elasticsearch>,
22 repo: Arc<Repo>,
23}
24
25static SEARCHPARAMETER_CACHE: LazyLock<Cache<(TenantId, ProjectId), Arc<SearchParametersIndex>>> =
26 LazyLock::new(|| {
27 CacheBuilder::new(50_000)
28 .time_to_idle(std::time::Duration::from_secs(2 * 60 * 60))
30 .build()
31 });
32
33impl<Repo: Repository + Send + Sync> ElasticSearchParameterResolver<Repo> {
34 pub fn new(es: Arc<Elasticsearch>, repo: Arc<Repo>) -> Self {
35 ElasticSearchParameterResolver { es, repo }
36 }
37}
38
39async fn create_project_sp_index<Repo: Repository + Send + Sync>(
40 es: Arc<Elasticsearch>,
41 repo: &Repo,
42 tenant: &TenantId,
43 project: &ProjectId,
44) -> Result<SearchParametersIndex, OperationOutcomeError> {
45 let result = search::execute_search(
46 es,
47 R4_SEARCH_PARAMETERS_INDEX.clone(),
48 &haste_repository::types::SupportedFHIRVersions::R4,
49 tenant,
50 project,
51 &SearchRequest::Type(FHIRSearchTypeRequest {
52 resource_type: ResourceType::SearchParameter,
53 parameters: ParsedParameters::new(vec![]),
54 }),
55 &Some(SearchOptions { count_limit: false }),
56 )
57 .await?;
58
59 let version_ids = result
60 .entries
61 .iter()
62 .map(|r| &r.version_id)
63 .collect::<Vec<_>>();
64
65 let project_sps = repo
66 .read_by_version_ids(tenant, project, &version_ids, CachePolicy::Cache)
67 .await?
68 .into_iter()
69 .filter_map(|r| match r {
70 Resource::SearchParameter(sp) => Some(sp),
71 _ => None,
72 })
73 .collect::<Vec<_>>();
74
75 Ok(create_index_map(
76 &crate::ParameterLevel::Project,
77 project_sps,
78 ))
79}
80
81async fn get_or_create_sp_index_for_project<Repo: Repository + Send + Sync>(
82 es: Arc<Elasticsearch>,
83 repo: &Repo,
84 tenant: TenantId,
85 project: ProjectId,
86) -> Result<Option<Arc<SearchParametersIndex>>, OperationOutcomeError> {
87 match (&tenant, &project) {
88 (TenantId::System, ProjectId::System) => Ok(None),
89 _ => {
90 let index_key = (tenant, project);
91 if let Some(index) = SEARCHPARAMETER_CACHE.get(&index_key).await {
92 Ok(Some(index))
93 } else {
94 let index =
95 Arc::new(create_project_sp_index(es, repo, &index_key.0, &index_key.1).await?);
96 SEARCHPARAMETER_CACHE.insert(index_key, index.clone()).await;
97
98 Ok(Some(index))
99 }
100 }
101 }
102}
103
104impl<Repo: Repository + Send + Sync> SearchParameterResolve
105 for ElasticSearchParameterResolver<Repo>
106{
107 async fn by_resource_type(
108 &self,
109 tenant: &haste_jwt::TenantId,
110 project: &haste_jwt::ProjectId,
111 resource_type: &haste_fhir_model::r4::generated::resources::ResourceType,
112 ) -> Result<Vec<ResolvedParameter>, OperationOutcomeError> {
113 let mut sps_by_resource_type = R4_SEARCH_PARAMETERS_INDEX
114 .by_resource_type(tenant, project, resource_type)
115 .await?;
116
117 if let Some(project_index) = get_or_create_sp_index_for_project(
118 self.es.clone(),
119 self.repo.as_ref(),
120 tenant.clone(),
121 project.clone(),
122 )
123 .await?
124 {
125 let project_sps = project_index
126 .by_resource_type(tenant, project, resource_type)
127 .await?;
128
129 sps_by_resource_type.extend(project_sps);
130 }
131
132 Ok(sps_by_resource_type)
133 }
134
135 async fn by_name(
136 &self,
137 tenant: &haste_jwt::TenantId,
138 project: &haste_jwt::ProjectId,
139 resource_type: Option<&haste_fhir_model::r4::generated::resources::ResourceType>,
140 code: &str,
141 ) -> Result<Option<ResolvedParameter>, OperationOutcomeError> {
142 if let Some(parameter) = R4_SEARCH_PARAMETERS_INDEX
143 .by_name(tenant, project, resource_type, code)
144 .await?
145 {
146 Ok(Some(parameter))
147 } else if let Some(project_index) = get_or_create_sp_index_for_project(
148 self.es.clone(),
149 self.repo.as_ref(),
150 tenant.clone(),
151 project.clone(),
152 )
153 .await?
154 {
155 project_index
156 .by_name(tenant, project, resource_type, code)
157 .await
158 } else {
159 Ok(None)
160 }
161 }
162
163 async fn all(
164 &self,
165 tenant: &haste_jwt::TenantId,
166 project: &haste_jwt::ProjectId,
167 ) -> Result<Vec<ResolvedParameter>, OperationOutcomeError> {
168 let mut all_sps = R4_SEARCH_PARAMETERS_INDEX.all(tenant, project).await?;
169
170 if let Some(project_index) = get_or_create_sp_index_for_project(
171 self.es.clone(),
172 self.repo.as_ref(),
173 tenant.clone(),
174 project.clone(),
175 )
176 .await?
177 {
178 all_sps.extend(project_index.all(tenant, project).await?);
179 }
180
181 Ok(all_sps)
182 }
183}