1use crate::indexing_lock::IndexLockProvider;
2use haste_config::get_config;
3use haste_fhir_model::r4::generated::resources::ResourceTypeError;
4use haste_fhir_operation_error::{OperationOutcomeError, derive::OperationOutcomeError};
5use haste_fhir_search::{
6 IndexResource, SearchEngine,
7 elastic_search::{
8 ElasticSearchEngine, create_es_client,
9 search_parameter_resolver::ElasticSearchParameterResolver,
10 },
11};
12use haste_fhirpath::FHIRPathError;
13use haste_jwt::{TenantId, VersionId};
14use haste_repository::{fhir::FHIRRepository, types::SupportedFHIRVersions};
15use sqlx::{Pool, Postgres, query_as, types::time::OffsetDateTime};
16use std::sync::Arc;
17use tokio::sync::Mutex;
18
19mod indexing_lock;
20
21#[derive(OperationOutcomeError, Debug)]
22pub enum IndexingWorkerError {
23 #[fatal(code = "exception", diagnostic = "Database error: '{arg0}'")]
24 DatabaseConnectionError(#[from] sqlx::Error),
25 #[fatal(code = "exception", diagnostic = "Lock error: '{arg0}'")]
26 OperationError(#[from] OperationOutcomeError),
27 #[fatal(code = "exception", diagnostic = "Elasticsearch error: '{arg0}'")]
28 ElasticsearchError(#[from] elasticsearch::Error),
29 #[fatal(code = "exception", diagnostic = "FHIRPath error: '{arg0}'")]
30 FHIRPathError(#[from] FHIRPathError),
31 #[fatal(
32 code = "exception",
33 diagnostic = "Missing search parameters for resource: '{arg0}'"
34 )]
35 MissingSearchParameters(String),
36 #[fatal(
37 code = "exception",
38 diagnostic = "Fatal error occurred during indexing"
39 )]
40 Fatal,
41 #[fatal(
42 code = "exception",
43 diagnostic = "Artifact error: Invalid resource type '{arg0}'"
44 )]
45 ResourceTypeError(#[from] ResourceTypeError),
46}
47
48struct TenantReturn {
49 id: TenantId,
50 created_at: OffsetDateTime,
51}
52
53async fn get_tenants(
54 client: &Pool<Postgres>,
55 cursor: &OffsetDateTime,
56 count: usize,
57) -> Result<Vec<TenantReturn>, OperationOutcomeError> {
58 let result = query_as!(
59 TenantReturn,
60 r#"SELECT id as "id: TenantId", created_at FROM tenants WHERE created_at > $1 ORDER BY created_at DESC LIMIT $2"#,
61 cursor,
62 count as i64
63 )
64 .fetch_all(client)
65 .await
66 .map_err(IndexingWorkerError::from)?;
67
68 Ok(result)
69}
70
71static TOTAL_INDEXED: std::sync::LazyLock<Mutex<usize>> =
72 std::sync::LazyLock::new(|| Mutex::new(0));
73
74async fn index_tenant_next_sequence<
75 Repo: FHIRRepository + IndexLockProvider,
76 Engine: SearchEngine,
77>(
78 search_client: Arc<Engine>,
79 tx: &Repo,
80 tenant_id: &TenantId,
81) -> Result<(), IndexingWorkerError> {
82 let start = std::time::Instant::now();
83 let tenant_locks = tx.get_available_locks(vec![tenant_id]).await?;
84
85 if tenant_locks.is_empty() {
86 return Ok(());
87 }
88
89 let resources = tx
90 .get_sequence(
91 tenant_id,
92 tenant_locks[0].index_sequence_position as u64,
93 Some(1000),
94 )
95 .await?;
96
97 let resources_total = resources.len();
98 let start_sequence = resources.first().map(|r| r.sequence);
99 let last_value = resources.last().cloned();
100
101 if !resources.is_empty() {
103 let result = search_client
104 .index(
105 SupportedFHIRVersions::R4,
106 resources
107 .into_iter()
108 .map(|r| IndexResource {
109 tenant: r.tenant,
110 id: r.id,
111 version_id: VersionId::new(r.version_id),
112 project: r.project,
113 fhir_method: r.fhir_method,
114 resource_type: r.resource_type,
115 resource: r.resource.0,
116 })
117 .collect(),
118 )
119 .await?;
120
121 if result.0 != resources_total {
122 tracing::error!(
123 "Indexed resource count '{}' does not match retrieved resource count '{}'",
124 result.0,
125 resources_total
126 );
127 return Err(IndexingWorkerError::Fatal);
128 }
129
130 if let Some(resource) = last_value {
131 let diff = (resource.sequence + 1) - start_sequence.unwrap_or(0);
132 let total = resources_total;
133
134 if total != diff as usize {
135 tracing::event!(
136 tracing::Level::INFO,
137 first_seq = start_sequence.unwrap_or(0),
139 last_seq = resource.sequence,
140 total = resources_total,
141 diff = (resource.sequence + 1) - start_sequence.unwrap_or(0)
142 );
143 }
144
145 tx.update_lock(tenant_id.as_ref(), resource.sequence as usize)
146 .await?;
147
148 let elapsed = start.elapsed();
149 tracing::info!(
150 "Indexed {} resources for tenant '{}' in {:.2?} (up to sequence {})",
151 result.0,
152 tenant_id.as_ref(),
153 elapsed,
154 resource.sequence
155 );
156 }
157
158 *(TOTAL_INDEXED.lock().await) += result.0;
159 }
160
161 Ok(())
162}
163
164async fn index_for_tenant<Search: SearchEngine, Repository: FHIRRepository + IndexLockProvider>(
165 repo: Arc<Repository>,
166 search_client: Arc<Search>,
167 tenant_id: &TenantId,
168) -> Result<(), IndexingWorkerError> {
169 let search_client = search_client.clone();
170
171 let tx = repo.transaction(false).await.unwrap();
172
173 let res = index_tenant_next_sequence(search_client, &tx, &tenant_id).await;
174
175 match res {
176 Ok(res) => {
177 tx.commit().await?;
178 Ok(res)
179 }
180 Err(e) => {
181 tx.rollback().await?;
182 Err(e)
183 }
184 }
185}
186
187pub enum IndexingWorkerEnvironmentVariables {
188 DatabaseURL,
189 ElasticSearchURL,
190 ElasticSearchUsername,
191 ElasticSearchPassword,
192}
193
194impl From<IndexingWorkerEnvironmentVariables> for String {
195 fn from(value: IndexingWorkerEnvironmentVariables) -> Self {
196 match value {
197 IndexingWorkerEnvironmentVariables::DatabaseURL => "DATABASE_URL".to_string(),
198 IndexingWorkerEnvironmentVariables::ElasticSearchURL => "ELASTICSEARCH_URL".to_string(),
199 IndexingWorkerEnvironmentVariables::ElasticSearchUsername => {
200 "ELASTICSEARCH_USERNAME".to_string()
201 }
202 IndexingWorkerEnvironmentVariables::ElasticSearchPassword => {
203 "ELASTICSEARCH_PASSWORD".to_string()
204 }
205 }
206 }
207}
208
209pub async fn run_worker() -> Result<(), OperationOutcomeError> {
210 let config = get_config::<IndexingWorkerEnvironmentVariables>("environment".into());
211 let fp_engine = Arc::new(haste_fhirpath::FPEngine::new());
212
213 let pg_pool = sqlx::PgPool::connect(
214 &config
215 .get(IndexingWorkerEnvironmentVariables::DatabaseURL)
216 .unwrap(),
217 )
218 .await
219 .expect("Failed to connect to the database");
220 let repo = Arc::new(haste_repository::pg::PGConnection::pool(pg_pool.clone()));
221 let es_client = create_es_client(
222 &config
223 .get(IndexingWorkerEnvironmentVariables::ElasticSearchURL)
224 .expect(&format!(
225 "'{}' variable not set",
226 String::from(IndexingWorkerEnvironmentVariables::ElasticSearchURL)
227 )),
228 config
229 .get(IndexingWorkerEnvironmentVariables::ElasticSearchUsername)
230 .expect(&format!(
231 "'{}' variable not set",
232 String::from(IndexingWorkerEnvironmentVariables::ElasticSearchUsername)
233 )),
234 config
235 .get(IndexingWorkerEnvironmentVariables::ElasticSearchPassword)
236 .expect(&format!(
237 "'{}' variable not set",
238 String::from(IndexingWorkerEnvironmentVariables::ElasticSearchPassword)
239 )),
240 )
241 .expect("Failed to create Elasticsearch client");
242
243 let search_engine = Arc::new(ElasticSearchEngine::new(
244 Arc::new(ElasticSearchParameterResolver::new(
245 es_client.clone(),
246 repo.clone(),
247 )),
248 fp_engine.clone(),
249 es_client,
250 ));
251
252 let mut attempts = 0;
253 while !search_engine.is_connected().await.is_ok() && attempts < 5 {
254 tracing::error!("Elasticsearch is not connected, retrying in 5 seconds...");
255 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
256 attempts += 1;
257 }
258
259 let mut cursor = OffsetDateTime::UNIX_EPOCH;
260 let tenants_limit: usize = 100;
261
262 tracing::info!("Starting indexing worker...");
263
264 let mut k = *TOTAL_INDEXED.lock().await;
265
266 loop {
267 let tenants_to_check = get_tenants(&pg_pool, &cursor, tenants_limit).await;
268 if let Ok(tenants_to_check) = tenants_to_check {
269 if tenants_to_check.is_empty() || tenants_to_check.len() < tenants_limit {
270 cursor = OffsetDateTime::UNIX_EPOCH; } else {
272 cursor = tenants_to_check[0].created_at;
273 }
274
275 for tenant in tenants_to_check {
276 let result =
277 index_for_tenant(repo.clone(), search_engine.clone(), &tenant.id).await;
278
279 if let Err(_error) = result {
280 tracing::error!(
281 "Failed to index tenant: '{}' cause: '{:?}'",
282 &tenant.id,
283 _error
284 );
285 }
286 }
287 } else if let Err(error) = tenants_to_check {
288 tracing::error!("Failed to retrieve tenants: {:?}", error);
289 }
290
291 if k != *TOTAL_INDEXED.lock().await {
292 k = *TOTAL_INDEXED.lock().await;
293 tracing::info!("TOTAL INDEXED SO FAR: {}", k);
294 }
295 }
296}