001/** 002 * 003 * Copyright (c) 2014, the Railo Company Ltd. All rights reserved. 004 * 005 * This library is free software; you can redistribute it and/or 006 * modify it under the terms of the GNU Lesser General Public 007 * License as published by the Free Software Foundation; either 008 * version 2.1 of the License, or (at your option) any later version. 009 * 010 * This library is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 013 * Lesser General Public License for more details. 014 * 015 * You should have received a copy of the GNU Lesser General Public 016 * License along with this library. If not, see <http://www.gnu.org/licenses/>. 017 * 018 **/ 019package lucee.runtime.spooler; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.ObjectInputStream; 024import java.io.ObjectOutputStream; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.Iterator; 028import java.util.LinkedList; 029import java.util.List; 030 031import lucee.commons.io.IOUtil; 032import lucee.commons.io.SystemUtil; 033import lucee.commons.io.log.Log; 034import lucee.commons.io.log.LogUtil; 035import lucee.commons.io.res.Resource; 036import lucee.commons.io.res.filter.ResourceNameFilter; 037import lucee.commons.io.res.util.ResourceUtil; 038import lucee.commons.lang.ExceptionUtil; 039import lucee.commons.lang.SerializableObject; 040import lucee.commons.lang.StringUtil; 041import lucee.runtime.config.Config; 042import lucee.runtime.engine.ThreadLocalConfig; 043import lucee.runtime.exp.DatabaseException; 044import lucee.runtime.exp.PageException; 045import lucee.runtime.op.Caster; 046import lucee.runtime.op.Duplicator; 047import lucee.runtime.type.Array; 048import lucee.runtime.type.Collection; 049import lucee.runtime.type.KeyImpl; 050import lucee.runtime.type.Query; 051import lucee.runtime.type.QueryImpl; 052import lucee.runtime.type.Struct; 053import lucee.runtime.type.dt.DateTimeImpl; 054import lucee.runtime.type.util.ArrayUtil; 055import lucee.runtime.type.util.CollectionUtil; 056import lucee.runtime.type.util.KeyConstants; 057 058public class SpoolerEngineImpl implements SpoolerEngine { 059 060 private static final TaskFileFilter FILTER=new TaskFileFilter(); 061 062 private static final Collection.Key LAST_EXECUTION = KeyImpl.intern("lastExecution"); 063 private static final Collection.Key NEXT_EXECUTION = KeyImpl.intern("nextExecution"); 064 065 private static final Collection.Key CLOSED = KeyImpl.intern("closed"); 066 private static final Collection.Key TRIES = KeyImpl.intern("tries"); 067 private static final Collection.Key TRIES_MAX = KeyImpl.intern("triesmax"); 068 069 070 private String label; 071 072 073 //private LinkedList<SpoolerTask> openTaskss=new LinkedList<SpoolerTask>(); 074 //private LinkedList<SpoolerTask> closedTasks=new LinkedList<SpoolerTask>(); 075 private SimpleThread simpleThread; 076 private SerializableObject token=new SerializableObject(); 077 private SpoolerThread thread; 078 //private ExecutionPlan[] plans; 079 private Resource persisDirectory; 080 private long count=0; 081 private Log log; 082 private Config config; 083 private int add=0; 084 085 086 private Resource closedDirectory; 087 private Resource openDirectory; 088 089 private int maxThreads; 090 091 public SpoolerEngineImpl(Config config,Resource persisDirectory,String label, Log log, int maxThreads) { 092 this.config=config; 093 this.persisDirectory=persisDirectory; 094 095 closedDirectory = persisDirectory.getRealResource("closed"); 096 openDirectory = persisDirectory.getRealResource("open"); 097 //calculateSize(); 098 099 100 this.maxThreads=maxThreads; 101 this.label=label; 102 this.log=log; 103 //print.ds(persisDirectory.getAbsolutePath()); 104 //load(); 105 if(getOpenTaskCount()>0)start(); 106 } 107 108 /*private void calculateSize() { 109 closedCount=calculateSize(closedDirectory); 110 openCount=calculateSize(openDirectory); 111 }*/ 112 113 public void setMaxThreads(int maxThreads) { 114 this.maxThreads = maxThreads; 115 } 116 117 /** 118 * @return the maxThreads 119 */ 120 public int getMaxThreads() { 121 return maxThreads; 122 } 123 124 private int calculateSize(Resource res) { 125 return ResourceUtil.directrySize(res,FILTER); 126 } 127 128 @Override 129 public synchronized void add(SpoolerTask task) { 130 // if there is no plan execute and forget 131 if(task.getPlans()==null) { 132 if(task instanceof Task) 133 start((Task)task); 134 else { 135 start(new TaskWrap(task)); 136 log.error("spooler", "make class "+task.getClass().getName()+" a Task class"); 137 } 138 return; 139 } 140 141 //openTasks.add(task); 142 add++; 143 if (task.nextExecution() == 0) 144 task.setNextExecution(System.currentTimeMillis()); 145 task.setId(createId(task)); 146 store(task); 147 start(); 148 } 149 150 // add to interface 151 public void add(Task task) { 152 start(task); 153 } 154 155 156 private void start(Task task) { 157 if(task==null) return; 158 synchronized (task) { 159 if(simpleThread==null || !simpleThread.isAlive()) { 160 simpleThread=new SimpleThread(config,task); 161 162 simpleThread.setPriority(Thread.MIN_PRIORITY); 163 simpleThread.start(); 164 } 165 else { 166 simpleThread.tasks.add(task); 167 simpleThread.interrupt(); 168 } 169 } 170 } 171 172 private void start() { 173 if(thread==null || !thread.isAlive()) { 174 thread=new SpoolerThread(this); 175 thread.setPriority(Thread.MIN_PRIORITY); 176 thread.start(); 177 } 178 else if(thread.sleeping) { 179 thread.interrupt(); 180 } 181 } 182 183 @Override 184 public String getLabel() { 185 return label; 186 } 187 188 189 190 private SpoolerTask getTaskById(Resource dir,String id) { 191 return getTask(dir.getRealResource(id+".tsk"),null); 192 } 193 194 private SpoolerTask getTaskByName(Resource dir,String name) { 195 return getTask(dir.getRealResource(name),null); 196 } 197 198 private SpoolerTask getTask(Resource res, SpoolerTask defaultValue) { 199 InputStream is = null; 200 ObjectInputStream ois = null; 201 202 SpoolerTask task=defaultValue; 203 try { 204 is = res.getInputStream(); 205 ois = new ObjectInputStream(is); 206 task = (SpoolerTask) ois.readObject(); 207 } 208 catch (Throwable t) { 209 ExceptionUtil.rethrowIfNecessary(t); 210 IOUtil.closeEL(is); 211 IOUtil.closeEL(ois); 212 res.delete(); 213 } 214 IOUtil.closeEL(is); 215 IOUtil.closeEL(ois); 216 return task; 217 } 218 219 private void store(SpoolerTask task) { 220 ObjectOutputStream oos=null; 221 Resource persis = getFile(task); 222 if(persis.exists()) persis.delete(); 223 try { 224 oos = new ObjectOutputStream(persis.getOutputStream()); 225 oos.writeObject(task); 226 } 227 catch (IOException e) {e.printStackTrace();} 228 finally { 229 IOUtil.closeEL(oos); 230 } 231 } 232 233 private void unstore(SpoolerTask task) { 234 Resource persis = getFile(task); 235 boolean exists=persis.exists(); 236 if(exists) persis.delete(); 237 } 238 private Resource getFile(SpoolerTask task) { 239 Resource dir = persisDirectory.getRealResource(task.closed()?"closed":"open"); 240 dir.mkdirs(); 241 return dir.getRealResource(task.getId()+".tsk"); 242 } 243 244 private String createId(SpoolerTask task) { 245 Resource dir = persisDirectory.getRealResource(task.closed()?"closed":"open"); 246 dir.mkdirs(); 247 248 String id=null; 249 do{ 250 id=StringUtil.addZeros(++count, 8); 251 }while(dir.getRealResource(id+".tsk").exists()); 252 return id; 253 } 254 255 public long calculateNextExecution(SpoolerTask task) { 256 int _tries=0; 257 ExecutionPlan plan=null; 258 ExecutionPlan[] plans=task.getPlans(); 259 260 for(int i=0;i<plans.length;i++) { 261 _tries+=plans[i].getTries(); 262 if(_tries>task.tries()) { 263 plan=plans[i]; 264 break; 265 } 266 } 267 if(plan==null)return -1; 268 return task.lastExecution()+(plan.getIntervall()*1000); 269 } 270 271 public Query getOpenTasksAsQuery(int startrow, int maxrow) throws PageException { 272 return getTasksAsQuery(createQuery(),openDirectory,startrow, maxrow); 273 } 274 275 public Query getClosedTasksAsQuery(int startrow, int maxrow) throws PageException { 276 return getTasksAsQuery(createQuery(),closedDirectory,startrow, maxrow); 277 } 278 279 public Query getAllTasksAsQuery(int startrow, int maxrow) throws PageException { 280 Query query = createQuery(); 281 //print.o(startrow+":"+maxrow); 282 getTasksAsQuery(query,openDirectory,startrow, maxrow); 283 int records = query.getRecordcount(); 284 if(maxrow<0) maxrow=Integer.MAX_VALUE; 285 // no open tasks 286 if(records==0) { 287 startrow-=getOpenTaskCount(); 288 if(startrow<1) startrow=1; 289 } 290 else { 291 startrow=1; 292 maxrow-=records; 293 } 294 if(maxrow>0)getTasksAsQuery(query,closedDirectory,startrow, maxrow); 295 return query; 296 } 297 298 public int getOpenTaskCount() { 299 return calculateSize(openDirectory); 300 } 301 302 public int getClosedTaskCount() { 303 return calculateSize(closedDirectory); 304 } 305 306 307 private Query getTasksAsQuery(Query qry,Resource dir, int startrow, int maxrow) { 308 String[] children = dir.list(FILTER); 309 if(ArrayUtil.isEmpty(children)) return qry; 310 if(children.length<maxrow)maxrow=children.length; 311 SpoolerTask task; 312 313 int to=startrow+maxrow; 314 if(to>children.length)to=children.length; 315 if(startrow<1)startrow=1; 316 317 for(int i=startrow-1;i<to;i++){ 318 task = getTaskByName(dir, children[i]); 319 if(task!=null)addQueryRow(qry, task); 320 } 321 322 return qry; 323 } 324 325 private Query createQuery() throws DatabaseException { 326 String v="VARCHAR"; 327 String d="DATE"; 328 lucee.runtime.type.Query qry=new QueryImpl( 329 new String[]{"type","name","detail","id","lastExecution","nextExecution","closed","tries","exceptions","triesmax"}, 330 new String[]{v,v,"object",v,d,d,"boolean","int","object","int"}, 331 0,"query"); 332 return qry; 333 } 334 335 private void addQueryRow(lucee.runtime.type.Query qry, SpoolerTask task) { 336 int row = qry.addRow(); 337 try{ 338 qry.setAt(KeyConstants._type, row, task.getType()); 339 qry.setAt(KeyConstants._name, row, task.subject()); 340 qry.setAt(KeyConstants._detail, row, task.detail()); 341 qry.setAt(KeyConstants._id, row, task.getId()); 342 343 344 qry.setAt(LAST_EXECUTION, row,new DateTimeImpl(task.lastExecution(),true)); 345 qry.setAt(NEXT_EXECUTION, row,new DateTimeImpl(task.nextExecution(),true)); 346 qry.setAt(CLOSED, row,Caster.toBoolean(task.closed())); 347 qry.setAt(TRIES, row,Caster.toDouble(task.tries())); 348 qry.setAt(TRIES_MAX, row,Caster.toDouble(task.tries())); 349 qry.setAt(KeyConstants._exceptions, row,translateTime(task.getExceptions())); 350 351 int triesMax=0; 352 ExecutionPlan[] plans = task.getPlans(); 353 for(int y=0;y<plans.length;y++) { 354 triesMax+=plans[y].getTries(); 355 } 356 qry.setAt(TRIES_MAX, row,Caster.toDouble(triesMax)); 357 } 358 catch(Throwable t){ 359 ExceptionUtil.rethrowIfNecessary(t); 360 } 361 } 362 363 private Array translateTime(Array exp) { 364 exp=(Array) Duplicator.duplicate(exp,true); 365 Iterator<Object> it = exp.valueIterator(); 366 Struct sct; 367 while(it.hasNext()) { 368 sct=(Struct) it.next(); 369 sct.setEL(KeyConstants._time,new DateTimeImpl(Caster.toLongValue(sct.get(KeyConstants._time,null),0),true)); 370 } 371 return exp; 372 } 373 374 class SimpleThread extends Thread { 375 376 // 'tasks' needs to be synchronized because the other thread will access this list. 377 // otherwise tasks.size() will not match the actual size of the server and NPEs 378 // and unlimited loops may result. 379 List<Task> tasks = Collections.synchronizedList(new LinkedList<Task>()); 380 private Config config; 381 382 public SimpleThread(Config config, Task task){ 383 this.config=config; 384 tasks.add(task); 385 } 386 387 388 public void run() { 389 Task task; 390 while(tasks.size()>0){ 391 try { 392 task = CollectionUtil.remove(tasks,0,null); 393 if(task!=null)task.execute(config); 394 } 395 catch (Throwable t) { 396 ExceptionUtil.rethrowIfNecessary(t); 397 } 398 } 399 } 400 401 } 402 403 class SpoolerThread extends Thread { 404 405 private SpoolerEngineImpl engine; 406 private boolean sleeping; 407 private final int maxThreads; 408 409 public SpoolerThread(SpoolerEngineImpl engine) { 410 this.maxThreads=engine.getMaxThreads(); 411 this.engine=engine; 412 try{ 413 this.setPriority(MIN_PRIORITY); 414 } 415 // can throw security exceptions 416 catch(Throwable t){ 417 ExceptionUtil.rethrowIfNecessary(t); 418 } 419 } 420 421 public void run() { 422 String[] taskNames; 423 //SpoolerTask[] tasks; 424 SpoolerTask task=null; 425 long nextExection; 426 ThreadLocalConfig.register(engine.config); 427 //ThreadLocalPageContext.register(engine.); 428 List<TaskThread> runningTasks=new ArrayList<TaskThread>(); 429 TaskThread tt; 430 int adds; 431 432 while(getOpenTaskCount()>0) { 433 adds=engine.adds(); 434 taskNames = openDirectory.list(FILTER); 435 //tasks=engine.getOpenTasks(); 436 nextExection=Long.MAX_VALUE; 437 for(int i=0;i<taskNames.length;i++) { 438 task=getTaskByName(openDirectory, taskNames[i]); 439 if(task==null) continue; 440 441 if(task.nextExecution()<=System.currentTimeMillis()) { 442 //print.o("- execute " + task.getId()); 443 tt=new TaskThread(engine,task); 444 tt.start(); 445 runningTasks.add(tt); 446 } 447 else if(task.nextExecution()<nextExection && 448 nextExection!=-1 && 449 !task.closed()) 450 nextExection=task.nextExecution(); 451 nextExection=joinTasks(runningTasks,maxThreads,nextExection); 452 } 453 454 nextExection=joinTasks(runningTasks,0,nextExection); 455 if(adds!=engine.adds()) continue; 456 457 if(nextExection==Long.MAX_VALUE)break; 458 long sleep = nextExection-System.currentTimeMillis(); 459 460 //print.o("sleep:"+sleep+">"+(sleep/1000)); 461 if(sleep>0)doWait(sleep); 462 463 //if(sleep<0)break; 464 } 465 //print.o("end:"+getOpenTaskCount()); 466 } 467 468 private long joinTasks(List<TaskThread> runningTasks, int maxThreads,long nextExection) { 469 if(runningTasks.size()>=maxThreads){ 470 Iterator<TaskThread> it = runningTasks.iterator(); 471 TaskThread tt; 472 SpoolerTask task; 473 while(it.hasNext()){ 474 tt = it.next(); 475 SystemUtil.join(tt); 476 task = tt.getTask(); 477 478 if(task!=null && task.nextExecution()!=-1 && task.nextExecution()<nextExection && !task.closed()) { 479 nextExection=task.nextExecution(); 480 } 481 } 482 runningTasks.clear(); 483 } 484 return nextExection; 485 } 486 487 private void doWait(long sleep) { 488 //long start=System.currentTimeMillis(); 489 try { 490 sleeping=true; 491 synchronized (this) { 492 wait(sleep); 493 } 494 495 } catch (Throwable t) { 496 ExceptionUtil.rethrowIfNecessary(t); 497 } 498 finally { 499 sleeping=false; 500 } 501 } 502 503 } 504 505 506 class TaskThread extends Thread { 507 508 private SpoolerEngineImpl engine; 509 private SpoolerTask task; 510 511 public TaskThread(SpoolerEngineImpl engine,SpoolerTask task) { 512 this.engine=engine; 513 this.task=task; 514 } 515 516 public SpoolerTask getTask() { 517 return task; 518 } 519 520 public void run() { 521 ThreadLocalConfig.register(engine.config); 522 engine.execute(task); 523 ThreadLocalConfig.release(); 524 525 } 526 } 527 528 529 /** 530 * remove that task from Spooler 531 * @param task 532 */ 533 public void remove(SpoolerTask task) { 534 unstore(task); 535 //if(!openTasks.remove(task))closedTasks.remove(task); 536 } 537 538 public void removeAll() { 539 ResourceUtil.removeChildrenEL(openDirectory); 540 ResourceUtil.removeChildrenEL(closedDirectory); 541 SystemUtil.sleep(100); 542 ResourceUtil.removeChildrenEL(openDirectory); 543 ResourceUtil.removeChildrenEL(closedDirectory); 544 } 545 546 public int adds() { 547 //return openTasks.size()>0; 548 return add; 549 } 550 551 @Override 552 public void remove(String id) { 553 SpoolerTask task = getTaskById(openDirectory,id); 554 if(task==null)task=getTaskById(closedDirectory,id); 555 if(task!=null)remove(task); 556 } 557 558 /*private SpoolerTask getTaskById(SpoolerTask[] tasks, String id) { 559 for(int i=0;i<tasks.length;i++) { 560 if(tasks[i].getId().equals(id)) { 561 return tasks[i]; 562 } 563 } 564 return null; 565 }*/ 566 567 /** 568 * execute task by id and return eror throwd by task 569 * @param id 570 */ 571 public PageException execute(String id) { 572 SpoolerTask task = getTaskById(openDirectory,id); 573 if(task==null)task=getTaskById(closedDirectory,id); 574 if(task!=null){ 575 return execute(task); 576 } 577 return null; 578 } 579 580 public PageException execute(SpoolerTask task) { 581 //task.closed(); 582 try { 583 if(task instanceof SpoolerTaskSupport) // FUTURE this is bullshit, call the execute method directly, but you have to rewrite them for that 584 ((SpoolerTaskSupport)task)._execute(config); 585 else 586 task.execute(config); 587 588 unstore(task); 589 log.log(Log.LEVEL_INFO,"remote-client", task.subject()); 590 task.setLastExecution(System.currentTimeMillis()); 591 task.setNextExecution(-1); 592 593 task.setClosed(true); 594 task=null; 595 } 596 catch(Throwable t) { 597 ExceptionUtil.rethrowIfNecessary(t); 598 task.setLastExecution(System.currentTimeMillis()); 599 task.setNextExecution(calculateNextExecution(task)); 600 LogUtil.log(log,Log.LEVEL_ERROR,"remote-client", task.subject(),t); 601 if(task.nextExecution()==-1) { 602 //openTasks.remove(task); 603 //if(!closedTasks.contains(task))closedTasks.add(task); 604 unstore(task); 605 task.setClosed(true); 606 store(task); 607 task=null; 608 } 609 else 610 store(task); 611 612 return Caster.toPageException(t); 613 } 614 return null; 615 } 616 617 public void setLabel(String label) { 618 this.label = label; 619 } 620 621 public void setPersisDirectory(Resource persisDirectory) { 622 this.persisDirectory = persisDirectory; 623 } 624 625 public void setLog(Log log) { 626 this.log = log; 627 } 628 629 public void setConfig(Config config) { 630 this.config = config; 631 } 632 633} 634 635class TaskFileFilter implements ResourceNameFilter { 636 637 public boolean accept(Resource parent, String name) { 638 return name!=null && name.endsWith(".tsk"); 639 } 640 641}