Skip to main content

haste_server/
load_artifacts.rs

1use std::{collections::HashSet, sync::Arc};
2
3use crate::{ServerEnvironmentVariables, fhir_client::ServerCTX, services::create_services};
4use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD};
5use haste_artifacts::ARTIFACT_RESOURCES;
6use haste_config::Config;
7use haste_fhir_client::{
8    FHIRClient,
9    request::{FHIRSearchTypeRequest, SearchRequest},
10    url::{Parameter, ParsedParameter, ParsedParameters},
11};
12use haste_fhir_model::r4::generated::{
13    resources::{Resource, ResourceType, SearchParameter, StructureDefinition},
14    terminology::IssueType,
15    types::{Coding, FHIRCode, FHIRUri, Meta},
16};
17use haste_fhir_operation_error::OperationOutcomeError;
18use haste_fhir_search::{SearchEngine, SearchOptions};
19use haste_jwt::{ProjectId, TenantId};
20
21use haste_repository::{Repository, fhir::CachePolicy, types::SupportedFHIRVersions};
22use sha1::{Digest, Sha1};
23
24fn generate_sha256_hash(value: &Resource) -> String {
25    let json = serde_json::to_string(value).expect("failed to serialize value.");
26    let mut sha_hasher = Sha1::new();
27    sha_hasher.update(json.as_bytes());
28    let sha1 = sha_hasher.finalize();
29
30    let sha_string = URL_SAFE_NO_PAD.encode(&sha1);
31
32    sha_string
33}
34
35static HASH_TAG_SYSTEM: &str = "https://haste.health/fhir/CodeSystem/hash";
36
37fn _add_hash_tag(meta: &mut Option<Box<Meta>>, sha_hash: String) {
38    let hash_tag = Box::new(Coding {
39        system: Some(Box::new(FHIRUri {
40            value: Some(HASH_TAG_SYSTEM.to_string()),
41            ..Default::default()
42        })),
43        code: Some(Box::new(FHIRCode {
44            value: Some(sha_hash),
45            ..Default::default()
46        })),
47        ..Default::default()
48    });
49
50    let meta = if let Some(meta) = meta {
51        meta
52    } else {
53        *meta = Some(Box::new(Meta::default()));
54        meta.as_mut().unwrap()
55    };
56
57    match &mut meta.tag {
58        Some(tags) => tags.push(hash_tag),
59        None => meta.tag = Some(vec![hash_tag]),
60    }
61}
62
63fn add_hash_tag(resource: &mut Resource, sha_hash: String) {
64    match resource {
65        Resource::StructureDefinition(structure_definition) => {
66            _add_hash_tag(&mut structure_definition.meta, sha_hash)
67        }
68        Resource::CodeSystem(code_system) => _add_hash_tag(&mut code_system.meta, sha_hash),
69        Resource::ValueSet(value_set) => _add_hash_tag(&mut value_set.meta, sha_hash),
70        Resource::SearchParameter(search_parameter) => {
71            _add_hash_tag(&mut search_parameter.meta, sha_hash)
72        }
73        _ => {}
74    }
75}
76
77fn get_id(resource: &Resource) -> String {
78    match resource {
79        Resource::StructureDefinition(structure_definition) => {
80            structure_definition.id.clone().unwrap_or_default()
81        }
82        Resource::CodeSystem(code_system) => code_system.id.clone().unwrap_or_default(),
83        Resource::ValueSet(value_set) => value_set.id.clone().unwrap_or_default(),
84        Resource::SearchParameter(search_parameter) => {
85            search_parameter.id.clone().unwrap_or_default()
86        }
87        _ => todo!(
88            "Unsupported resource type '{}'",
89            resource.resource_type().as_ref()
90        ),
91    }
92}
93
94pub fn get_resource_type(resource: &Resource) -> ResourceType {
95    match resource {
96        Resource::StructureDefinition(_) => ResourceType::StructureDefinition,
97        Resource::CodeSystem(_) => ResourceType::CodeSystem,
98        Resource::ValueSet(_) => ResourceType::ValueSet,
99        Resource::SearchParameter(_) => ResourceType::SearchParameter,
100        _ => todo!(
101            "Unsupported resource type '{}'",
102            resource.resource_type().as_ref()
103        ),
104    }
105}
106
107/// This deletes existing artifacts and then reloads them. In a single transaction.
108pub async fn reset_artifacts(
109    config: Arc<dyn Config<ServerEnvironmentVariables>>,
110) -> Result<(), OperationOutcomeError> {
111    let services = create_services(config.clone()).await?;
112
113    let transaction = services.transaction().await?;
114
115    {
116        let ctx = Arc::new(ServerCTX::system(
117            TenantId::System,
118            ProjectId::System,
119            transaction.fhir_client.clone(),
120            transaction.rate_limit.clone(),
121        ));
122
123        tracing::info!("Deleting existing CodeSystems");
124        ctx.client
125            .delete_type(
126                ctx.clone(),
127                ResourceType::CodeSystem,
128                ParsedParameters::new(vec![]),
129            )
130            .await?;
131        tracing::info!("Deleting existing ValueSets");
132        ctx.client
133            .delete_type(
134                ctx.clone(),
135                ResourceType::ValueSet,
136                ParsedParameters::new(vec![]),
137            )
138            .await?;
139        tracing::info!("Deleting existing StructureDefinitions");
140        ctx.client
141            .delete_type(
142                ctx.clone(),
143                ResourceType::StructureDefinition,
144                ParsedParameters::new(vec![]),
145            )
146            .await?;
147        tracing::info!("Deleting existing SearchParameters");
148        ctx.client
149            .delete_type(
150                ctx.clone(),
151                ResourceType::SearchParameter,
152                ParsedParameters::new(vec![]),
153            )
154            .await?;
155        _load_artifacts(ctx.clone()).await?;
156    }
157
158    transaction.commit().await?;
159
160    Ok(())
161}
162
163// Used for both reloading artifacts and reset.
164async fn _load_artifacts<Client: FHIRClient<Arc<ServerCTX<Client>>, OperationOutcomeError>>(
165    ctx: Arc<ServerCTX<Client>>,
166) -> Result<(), OperationOutcomeError> {
167    let mut hashes = HashSet::new();
168
169    for resource in ARTIFACT_RESOURCES.iter() {
170        let sha_hash = generate_sha256_hash(*&resource);
171        hashes.insert(sha_hash);
172
173        match &**resource {
174            Resource::SearchParameter(_)
175            | Resource::CodeSystem(_)
176            | Resource::ValueSet(_)
177            | Resource::StructureDefinition(_) => {
178                let mut resource = (**resource).clone();
179                let resource_type = get_resource_type(&resource);
180                let id = get_id(&resource);
181                let sha_hash = generate_sha256_hash(&resource);
182
183                add_hash_tag(&mut resource, sha_hash.clone());
184
185                let res = ctx
186                    .client
187                    .conditional_update(
188                        ctx.clone(),
189                        resource_type.clone(),
190                        ParsedParameters::new(vec![
191                            ParsedParameter::Resource(Parameter {
192                                name: "_id".to_string(),
193                                value: vec![id.clone()],
194                                modifier: None,
195                                chains: None,
196                            }),
197                            ParsedParameter::Resource(Parameter {
198                                name: "_tag".to_string(),
199                                value: vec![HASH_TAG_SYSTEM.to_string() + "|" + sha_hash.as_str()],
200                                modifier: Some("not".to_string()),
201                                chains: None,
202                            }),
203                        ]),
204                        resource.clone(),
205                    )
206                    .await;
207
208                if let Ok(res) = res {
209                    tracing::info!(
210                        "Updated '{}' with id '{}'",
211                        resource_type.as_ref(),
212                        res.id().as_deref().unwrap_or("unknown")
213                    );
214                } else if let Err(err) = res {
215                    let code = err.outcome().issue[0].code.as_ref();
216                    let diagnostic = err.outcome().issue[0]
217                        .diagnostics
218                        .as_deref()
219                        .and_then(|d| d.value.as_ref().map(|v| v.as_str()))
220                        .unwrap_or("unknown");
221
222                    match err.outcome().issue[0].code.as_ref() {
223                        IssueType::Invalid(_) => {
224                            tracing::error!("{:#?}", err);
225                            panic!("INVALID");
226                        }
227                        IssueType::Conflict(None) => {
228                            // Ignore.
229                        }
230                        _ => {
231                            tracing::error!(
232                                "Failed to update '{}' with id '{}'. Issue code: '{:?}', diagnostic: '{}'",
233                                resource_type.as_ref(),
234                                id,
235                                code,
236                                diagnostic
237                            );
238                        }
239                    }
240                }
241            }
242            _ => {
243                // println!("Skipping resource.");
244            }
245        }
246    }
247
248    tracing::info!(
249        "Loaded a total of '{}' artifacts with unique hashes '{}'",
250        ARTIFACT_RESOURCES.len(),
251        hashes.len(),
252    );
253
254    Ok(())
255}
256
257pub async fn load_artifacts(
258    config: Arc<dyn Config<ServerEnvironmentVariables>>,
259) -> Result<(), OperationOutcomeError> {
260    let services = create_services(config.clone()).await?;
261
262    let ctx = Arc::new(ServerCTX::system(
263        TenantId::System,
264        ProjectId::System,
265        services.fhir_client.clone(),
266        services.rate_limit.clone(),
267    ));
268
269    _load_artifacts(ctx.clone()).await
270}
271
272pub async fn get_all_sds<Repo: Repository, Search: SearchEngine>(
273    kinds: &[&str],
274    repo: &Repo,
275    search_engine: &Search,
276) -> Result<Vec<StructureDefinition>, OperationOutcomeError> {
277    let sd_search = FHIRSearchTypeRequest {
278        resource_type: ResourceType::StructureDefinition,
279        parameters: ParsedParameters::new(vec![
280            ParsedParameter::Resource(Parameter {
281                name: "kind".to_string(),
282                value: kinds.iter().map(|s| s.to_string()).collect(),
283                modifier: None,
284                chains: None,
285            }),
286            ParsedParameter::Resource(Parameter {
287                name: "abstract".to_string(),
288                value: vec!["false".to_string()],
289                modifier: None,
290                chains: None,
291            }),
292            ParsedParameter::Resource(Parameter {
293                name: "derivation".to_string(),
294                value: vec!["specialization".to_string()],
295                modifier: None,
296                chains: None,
297            }),
298            // ParsedParameter::Result(Parameter {
299            //     name: "_sort".to_string(),
300            //     value: vec!["url".to_string()],
301            //     modifier: None,
302            //     chains: None,
303            // }),
304        ]),
305    };
306    let sd_results = search_engine
307        .search(
308            &SupportedFHIRVersions::R4,
309            &TenantId::System,
310            &ProjectId::System,
311            &SearchRequest::Type(sd_search),
312            Some(SearchOptions {
313                count_limit: Some(10_000),
314            }),
315        )
316        .await?;
317
318    let version_ids = sd_results
319        .entries
320        .iter()
321        .map(|v| &v.version_id)
322        .collect::<Vec<_>>();
323
324    let sds = repo
325        .read_by_version_ids(
326            &TenantId::System,
327            &ProjectId::System,
328            version_ids.as_slice(),
329            CachePolicy::NoCache,
330        )
331        .await?
332        .into_iter()
333        .filter_map(|r| match r {
334            Resource::StructureDefinition(sd) => Some(sd),
335            _ => None,
336        });
337
338    Ok(sds.collect())
339}
340
341pub async fn get_all_sps<Repo: Repository, Search: SearchEngine>(
342    repo: &Repo,
343    search_engine: &Search,
344) -> Result<Vec<SearchParameter>, OperationOutcomeError> {
345    let sp_search = FHIRSearchTypeRequest {
346        resource_type: ResourceType::SearchParameter,
347        parameters: ParsedParameters::new(vec![]),
348    };
349    let sp_results = search_engine
350        .search(
351            &SupportedFHIRVersions::R4,
352            &TenantId::System,
353            &ProjectId::System,
354            &SearchRequest::Type(sp_search),
355            Some(SearchOptions {
356                count_limit: Some(10_000),
357            }),
358        )
359        .await?;
360
361    let version_ids = sp_results
362        .entries
363        .iter()
364        .map(|v| &v.version_id)
365        .collect::<Vec<_>>();
366
367    let sps = repo
368        .read_by_version_ids(
369            &TenantId::System,
370            &ProjectId::System,
371            version_ids.as_slice(),
372            CachePolicy::NoCache,
373        )
374        .await?
375        .into_iter()
376        .filter_map(|r| match r {
377            Resource::SearchParameter(sp) => Some(sp),
378            _ => None,
379        });
380
381    Ok(sps.collect())
382}