import { getDecoder, preferWorker } from './compression/index.js';
import create from './worker/create.js';
const defaultPoolSize = typeof navigator !== 'undefined' ? (navigator.hardwareConcurrency || 2) : 2;
/**
* @module pool
*/
/**
* Wrapper for a worker that can submit jobs to the worker and receive responses.
*/
class WorkerWrapper {
/**
* @param {Worker} worker the worker to wrap
*/
constructor(worker) {
this.worker = worker;
this.worker.addEventListener('message', (e) => this._onWorkerMessage(e));
this.jobIdCounter = 0;
this.jobs = new Map();
}
/**
* Get a new job id
* @returns {Number} the new job id
*/
newJobId() {
return this.jobIdCounter++;
}
/**
* Get the number of jobs currently running
* @returns {Number} the number of jobs currently running
*/
getJobCount() {
return this.jobs.size;
}
_onWorkerMessage(e) {
const { jobId, error, ...result } = e.data;
const job = this.jobs.get(jobId);
this.jobs.delete(jobId);
if (error) {
job.reject(new Error(error));
} else {
job.resolve(result);
}
}
/**
* Submit a job to the worker
* @param {Object} message the message to send to the worker. A "jobId" property will be added to this object.
* @param {Object[]} [transferables] an optional array of transferable objects to transfer to the worker.
* @returns {Promise} a promise that gets resolved/rejected when a message with the same jobId is received from the worker.
*/
submitJob(message, transferables = undefined) {
const jobId = this.newJobId();
let resolve;
let reject;
const promise = new Promise((_resolve, _reject) => {
resolve = _resolve;
reject = _reject;
});
this.jobs.set(jobId, { resolve, reject });
this.worker.postMessage({ ...message, jobId }, transferables);
return promise;
}
terminate() {
this.worker.terminate();
}
}
const finalizationRegistry = new FinalizationRegistry((worker) => {
worker.terminate();
});
/**
* Pool for workers to decode chunks of the images.
*/
class Pool {
/**
* @constructor
* @param {Number} [size] The size of the pool. Defaults to the number of CPUs
* available. When this parameter is `null` or 0, then the
* decoding will be done in the main thread.
* @param {function(): Worker} [createWorker] A function that creates the decoder worker.
* Defaults to a worker with all decoders that ship with geotiff.js. The `createWorker()`
* function is expected to return a `Worker` compatible with Web Workers. For code that
* runs in Node, [web-worker](https://www.npmjs.com/package/web-worker) is a good choice.
*
* A worker that uses a custom lzw decoder would look like this `my-custom-worker.js` file:
* ```js
* import { addDecoder, getDecoder } from 'geotiff';
* addDecoder(5, () => import ('./my-custom-lzw').then((m) => m.default));
* self.addEventListener('message', async (e) => {
* const { id, fileDirectory, buffer } = e.data;
* const decoder = await getDecoder(fileDirectory);
* const decoded = await decoder.decode(fileDirectory, buffer);
* self.postMessage({ decoded, id }, [decoded]);
* });
* ```
* The way the above code is built into a worker by the `createWorker()` function
* depends on the used bundler. For most bundlers, something like this will work:
* ```js
* function createWorker() {
* return new Worker(new URL('./my-custom-worker.js', import.meta.url));
* }
* ```
*/
constructor(size = defaultPoolSize, createWorker = create) {
this.workerWrappers = null;
if (size) {
this.workerWrappers = (async () => {
const workerWrappers = [];
for (let i = 0; i < size; i++) {
const worker = createWorker();
const wrapper = new WorkerWrapper(worker);
workerWrappers.push(wrapper);
finalizationRegistry.register(wrapper, worker, wrapper);
}
return workerWrappers;
})();
}
}
/**
* Decode the given block of bytes with the set compression method.
* @param {ArrayBuffer} buffer the array buffer of bytes to decode.
* @returns {Promise<ArrayBuffer>} the decoded result as a `Promise`
*/
async decode(fileDirectory, buffer) {
if (preferWorker(fileDirectory) && this.workerWrappers) {
// select the worker with the lowest jobCount
const workerWrapper = (await this.workerWrappers).reduce((a, b) => {
return a.getJobCount() < b.getJobCount() ? a : b;
});
const { decoded } = await workerWrapper.submitJob({ fileDirectory, buffer }, [buffer]);
return decoded;
} else {
return getDecoder(fileDirectory).then((decoder) => decoder.decode(fileDirectory, buffer));
}
}
async destroy() {
if (this.workerWrappers) {
(await this.workerWrappers).forEach((worker) => {
worker.terminate();
});
this.workerWrappers = null;
}
}
}
export default Pool;