package Utils;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
public interface ISliceRunner
{
void runSlices(ISliceCompute work, int maxWorkers, int length, int largestAtom) throws ExecutionException;
public class Series implements ISliceRunner
{
@Override
public void runSlices(ISliceCompute work, int maxWorkers, int length, int largestAtom) throws ExecutionException
{
work.initSlices(1);
ISliceCompute.Result res = new ISliceCompute.Result();
res.idx = 0;
try {
res.result = work.computeSlice(0, 0, length);
}
catch (Throwable t) {
throw new ExecutionException(t);
}
work.finishSlice(res);
}
}
public class Parallel implements ISliceRunner
{
public ThreadPoolExecutor threadPool;
public Parallel(ThreadPoolExecutor threadPool)
{
this.threadPool = threadPool;
}
@Override
public void runSlices(ISliceCompute work, int maxWorkers, int length, int largestAtom) throws ExecutionException
{
work.initSlices(countSlices(maxWorkers, length, largestAtom));
final int nWorkers = Math.max(maxWorkers, 1);
final int atomSize = Math.max(largestAtom, 1);
final int workJump = nWorkers * atomSize;
LinkedBlockingQueue<ISliceCompute.Result> resultQueue = new LinkedBlockingQueue<>();
int offset = 0;
int nSlices = 0;
for (int i = 0; i < maxWorkers; i++) {
final int off = offset;
threadPool.submit(() -> {
for (int j = off; j < length; j += workJump) {
int idx = j / atomSize;
int chunkSize = Math.min(atomSize, length - j);
ISliceCompute.Result res = new ISliceCompute.Result();
res.idx = idx;
try {
res.result = work.computeSlice(idx, j, chunkSize);
}
catch (Throwable ex) {
res.ex = ex;
}
finally {
resultQueue.add(res);
}
}
});
nSlices += (length - offset + workJump - 1) / workJump;
offset += atomSize;
if (offset >= length)
break;
}
for (int i = 0; i < nSlices; i++) {
ISliceCompute.Result res;
try {
res = resultQueue.take();
}
catch (InterruptedException ex) {
throw new ExecutionException(new Exception("Interrupted"));
}
if (res.ex != null)
throw new ExecutionException(res.ex);
work.finishSlice(res);
}
}
private static int countSlices(int maxWorkers, int length, int largestAtom)
{
final int nWorkers = Math.max(maxWorkers, 1);
final int atomSize = Math.max(largestAtom, 1);
final int workJump = nWorkers * atomSize;
int nSlices = 0;
int offset = 0;
for (int i = 0; i < maxWorkers; i++) {
nSlices += (length - offset + workJump - 1) / workJump;
offset += atomSize;
if (offset >= length)
break;
}
return nSlices;
}
}
}