haste_fhir_search/elastic_search/
search_parameter_resolver.rs1use elasticsearch::Elasticsearch;
2use haste_fhir_client::{
3 request::{FHIRSearchTypeRequest, SearchRequest},
4 url::{Parameter, ParsedParameter, ParsedParameters},
5};
6use haste_fhir_model::r4::generated::resources::{Resource, ResourceType};
7use haste_fhir_operation_error::OperationOutcomeError;
8use haste_jwt::{ProjectId, TenantId};
9use haste_repository::{Repository, fhir::CachePolicy};
10use moka::future::{Cache, CacheBuilder};
11use std::sync::{Arc, LazyLock};
12
13use crate::{
14 ResolvedParameter, SearchOptions, SearchParameterResolve,
15 elastic_search::search,
16 memory::{R4_SEARCH_PARAMETERS_INDEX, SearchParametersIndex, create_index_map},
17};
18
19#[derive(Clone)]
20pub struct ElasticSearchParameterResolver<Repo: Repository + Send + Sync> {
21 es: Arc<Elasticsearch>,
22 repo: Arc<Repo>,
23}
24
25static SEARCHPARAMETER_CACHE: LazyLock<Cache<(TenantId, ProjectId), Arc<SearchParametersIndex>>> =
26 LazyLock::new(|| {
27 CacheBuilder::new(50_000)
28 .time_to_idle(std::time::Duration::from_secs(2 * 60 * 60))
30 .build()
31 });
32
33impl<Repo: Repository + Send + Sync> ElasticSearchParameterResolver<Repo> {
34 pub fn new(es: Arc<Elasticsearch>, repo: Arc<Repo>) -> Self {
35 ElasticSearchParameterResolver { es, repo }
36 }
37}
38
39async fn create_project_sp_index<Repo: Repository + Send + Sync>(
40 es: Arc<Elasticsearch>,
41 repo: &Repo,
42 tenant: &TenantId,
43 project: &ProjectId,
44) -> Result<SearchParametersIndex, OperationOutcomeError> {
45 let result = search::execute_search(
46 es,
47 R4_SEARCH_PARAMETERS_INDEX.clone(),
48 &haste_repository::types::SupportedFHIRVersions::R4,
49 tenant,
50 project,
51 &SearchRequest::Type(FHIRSearchTypeRequest {
52 resource_type: ResourceType::SearchParameter,
53 parameters: ParsedParameters::new(vec![ParsedParameter::Resource(Parameter {
54 name: "status".to_string(),
55 value: vec!["active".to_string()],
56 chains: None,
57 modifier: None,
58 })]),
59 }),
60 &Some(SearchOptions {
61 count_limit: Some(10_000),
62 }),
63 )
64 .await?;
65
66 let version_ids = result
67 .entries
68 .iter()
69 .map(|r| &r.version_id)
70 .collect::<Vec<_>>();
71
72 let project_sps = repo
73 .read_by_version_ids(tenant, project, &version_ids, CachePolicy::Cache)
74 .await?
75 .into_iter()
76 .filter_map(|r| match r {
77 Resource::SearchParameter(sp) => Some(sp),
78 _ => None,
79 })
80 .collect::<Vec<_>>();
81
82 Ok(create_index_map(
83 &crate::ParameterLevel::Project,
84 project_sps,
85 ))
86}
87
88async fn get_or_create_sp_index_for_project<Repo: Repository + Send + Sync>(
89 es: Arc<Elasticsearch>,
90 repo: &Repo,
91 tenant: TenantId,
92 project: ProjectId,
93) -> Result<Option<Arc<SearchParametersIndex>>, OperationOutcomeError> {
94 match (&tenant, &project) {
95 (TenantId::System, ProjectId::System) => Ok(None),
96 _ => {
97 let index_key = (tenant, project);
98 if let Some(index) = SEARCHPARAMETER_CACHE.get(&index_key).await {
99 Ok(Some(index))
100 } else {
101 let index =
102 Arc::new(create_project_sp_index(es, repo, &index_key.0, &index_key.1).await?);
103 SEARCHPARAMETER_CACHE.insert(index_key, index.clone()).await;
104
105 Ok(Some(index))
106 }
107 }
108 }
109}
110
111impl<Repo: Repository + Send + Sync> SearchParameterResolve
112 for ElasticSearchParameterResolver<Repo>
113{
114 async fn by_resource_type(
115 &self,
116 tenant: &haste_jwt::TenantId,
117 project: &haste_jwt::ProjectId,
118 resource_type: &haste_fhir_model::r4::generated::resources::ResourceType,
119 ) -> Result<Vec<ResolvedParameter>, OperationOutcomeError> {
120 let mut sps_by_resource_type = R4_SEARCH_PARAMETERS_INDEX
121 .by_resource_type(tenant, project, resource_type)
122 .await?;
123
124 if let Some(project_index) = get_or_create_sp_index_for_project(
125 self.es.clone(),
126 self.repo.as_ref(),
127 tenant.clone(),
128 project.clone(),
129 )
130 .await?
131 {
132 let project_sps = project_index
133 .by_resource_type(tenant, project, resource_type)
134 .await?;
135
136 sps_by_resource_type.extend(project_sps);
137 }
138
139 Ok(sps_by_resource_type)
140 }
141
142 async fn by_name(
143 &self,
144 tenant: &haste_jwt::TenantId,
145 project: &haste_jwt::ProjectId,
146 resource_type: Option<&haste_fhir_model::r4::generated::resources::ResourceType>,
147 code: &str,
148 ) -> Result<Option<ResolvedParameter>, OperationOutcomeError> {
149 if let Some(parameter) = R4_SEARCH_PARAMETERS_INDEX
150 .by_name(tenant, project, resource_type, code)
151 .await?
152 {
153 Ok(Some(parameter))
154 } else if let Some(project_index) = get_or_create_sp_index_for_project(
155 self.es.clone(),
156 self.repo.as_ref(),
157 tenant.clone(),
158 project.clone(),
159 )
160 .await?
161 {
162 project_index
163 .by_name(tenant, project, resource_type, code)
164 .await
165 } else {
166 Ok(None)
167 }
168 }
169
170 async fn all(
171 &self,
172 tenant: &haste_jwt::TenantId,
173 project: &haste_jwt::ProjectId,
174 ) -> Result<Vec<ResolvedParameter>, OperationOutcomeError> {
175 let mut all_sps = R4_SEARCH_PARAMETERS_INDEX.all(tenant, project).await?;
176
177 if let Some(project_index) = get_or_create_sp_index_for_project(
178 self.es.clone(),
179 self.repo.as_ref(),
180 tenant.clone(),
181 project.clone(),
182 )
183 .await?
184 {
185 all_sps.extend(project_index.all(tenant, project).await?);
186 }
187
188 Ok(all_sps)
189 }
190}