001 package railo.runtime.spooler; 002 003 import java.io.IOException; 004 import java.io.InputStream; 005 import java.io.ObjectInputStream; 006 import java.io.ObjectOutputStream; 007 import java.util.ArrayList; 008 import java.util.Iterator; 009 import java.util.List; 010 011 import railo.commons.io.IOUtil; 012 import railo.commons.io.SystemUtil; 013 import railo.commons.io.log.Log; 014 import railo.commons.io.res.Resource; 015 import railo.commons.io.res.filter.ResourceNameFilter; 016 import railo.commons.io.res.util.ResourceUtil; 017 import railo.commons.lang.StringUtil; 018 import railo.runtime.config.Config; 019 import railo.runtime.engine.ThreadLocalConfig; 020 import railo.runtime.exp.ApplicationException; 021 import railo.runtime.exp.DatabaseException; 022 import railo.runtime.exp.PageException; 023 import railo.runtime.exp.PageRuntimeException; 024 import railo.runtime.op.Caster; 025 import railo.runtime.type.Array; 026 import railo.runtime.type.Collection; 027 import railo.runtime.type.KeyImpl; 028 import railo.runtime.type.Query; 029 import railo.runtime.type.QueryImpl; 030 import railo.runtime.type.Struct; 031 import railo.runtime.type.dt.DateTimeImpl; 032 import railo.runtime.type.util.ArrayUtil; 033 034 public class SpoolerEngineImpl implements SpoolerEngine { 035 036 private static final TaskFileFilter FILTER=new TaskFileFilter(); 037 038 private static final Collection.Key LAST_EXECUTION = KeyImpl.intern("lastExecution"); 039 private static final Collection.Key NEXT_EXECUTION = KeyImpl.intern("nextExecution"); 040 041 private static final Collection.Key CLOSED = KeyImpl.intern("closed"); 042 private static final Collection.Key TRIES = KeyImpl.intern("tries"); 043 private static final Collection.Key TRIES_MAX = KeyImpl.intern("triesmax"); 044 045 046 private String label; 047 048 049 //private LinkedList<SpoolerTask> openTaskss=new LinkedList<SpoolerTask>(); 050 //private LinkedList<SpoolerTask> closedTasks=new LinkedList<SpoolerTask>(); 051 private SpoolerThread thread; 052 //private ExecutionPlan[] plans; 053 private Resource persisDirectory; 054 private long count=0; 055 private Log log; 056 private Config config; 057 private int add=0; 058 059 060 private Resource closedDirectory; 061 private Resource openDirectory; 062 063 private int maxThreads; 064 065 public SpoolerEngineImpl(Config config,Resource persisDirectory,String label, Log log, int maxThreads) throws IOException { 066 this.config=config; 067 this.persisDirectory=persisDirectory; 068 069 closedDirectory = persisDirectory.getRealResource("closed"); 070 openDirectory = persisDirectory.getRealResource("open"); 071 //calculateSize(); 072 073 074 this.maxThreads=maxThreads; 075 this.label=label; 076 this.log=log; 077 //print.ds(persisDirectory.getAbsolutePath()); 078 //load(); 079 if(getOpenTaskCount()>0)start(); 080 } 081 082 /*private void calculateSize() { 083 closedCount=calculateSize(closedDirectory); 084 openCount=calculateSize(openDirectory); 085 }*/ 086 087 /** 088 * @return the maxThreads 089 */ 090 public int getMaxThreads() { 091 return maxThreads; 092 } 093 094 private int calculateSize(Resource res) { 095 return ResourceUtil.directrySize(res,FILTER); 096 } 097 098 /** 099 * @see railo.runtime.spooler.SpoolerEngine#add(railo.runtime.spooler.SpoolerTask) 100 */ 101 public synchronized void add(SpoolerTask task) { 102 //openTasks.add(task); 103 add++; 104 task.setNextExecution(System.currentTimeMillis()); 105 task.setId(createId(task)); 106 store(task); 107 start(); 108 } 109 110 111 private void start() { 112 if(thread==null || !thread.isAlive()) { 113 thread=new SpoolerThread(this); 114 thread.setPriority(Thread.MIN_PRIORITY); 115 thread.start(); 116 } 117 else if(thread.sleeping) { 118 thread.interrupt(); 119 } 120 //else print.out("- existing"); 121 } 122 123 /** 124 * @see railo.runtime.spooler.SpoolerEngine#getLabel() 125 */ 126 public String getLabel() { 127 return label; 128 } 129 130 131 132 private SpoolerTask getTaskById(Resource dir,String id) { 133 return getTask(dir.getRealResource(id+".tsk"),null); 134 } 135 136 private SpoolerTask getTaskByName(Resource dir,String name) { 137 return getTask(dir.getRealResource(name),null); 138 } 139 140 private SpoolerTask getTask(Resource res, SpoolerTask defaultValue) { 141 InputStream is = null; 142 ObjectInputStream ois = null; 143 144 SpoolerTask task=defaultValue; 145 try { 146 is = res.getInputStream(); 147 ois = new ObjectInputStream(is); 148 task = (SpoolerTask) ois.readObject(); 149 } 150 catch (Throwable t) {//t.printStackTrace(); 151 IOUtil.closeEL(is); 152 IOUtil.closeEL(ois); 153 res.delete(); 154 } 155 IOUtil.closeEL(is); 156 IOUtil.closeEL(ois); 157 return task; 158 } 159 160 private void store(SpoolerTask task) { 161 ObjectOutputStream oos=null; 162 Resource persis = getFile(task); 163 if(persis.exists()) persis.delete(); 164 try { 165 oos = new ObjectOutputStream(persis.getOutputStream()); 166 oos.writeObject(task); 167 } 168 catch (IOException e) {} 169 finally { 170 IOUtil.closeEL(oos); 171 } 172 } 173 174 private void unstore(SpoolerTask task) { 175 Resource persis = getFile(task); 176 boolean exists=persis.exists(); 177 if(exists) persis.delete(); 178 } 179 private Resource getFile(SpoolerTask task) { 180 Resource dir = persisDirectory.getRealResource(task.closed()?"closed":"open"); 181 dir.mkdirs(); 182 return dir.getRealResource(task.getId()+".tsk"); 183 } 184 185 private String createId(SpoolerTask task) { 186 Resource dir = persisDirectory.getRealResource(task.closed()?"closed":"open"); 187 dir.mkdirs(); 188 189 String id=null; 190 do{ 191 id=StringUtil.addZeros(++count, 8); 192 }while(dir.getRealResource(id+".tsk").exists()); 193 return id; 194 } 195 196 197 /** 198 * @see railo.runtime.spooler.SpoolerEngine#calculateNextExecution(railo.runtime.spooler.SpoolerTask) 199 */ 200 public long calculateNextExecution(SpoolerTask task) { 201 int _tries=0; 202 ExecutionPlan plan=null; 203 ExecutionPlan[] plans=task.getPlans(); 204 205 for(int i=0;i<plans.length;i++) { 206 _tries+=plans[i].getTries(); 207 if(_tries>task.tries()) { 208 plan=plans[i]; 209 break; 210 } 211 } 212 if(plan==null)return -1; 213 return task.lastExecution()+(plan.getIntervall()*1000); 214 } 215 216 /** 217 * @see railo.runtime.spooler.SpoolerEngine#getOpenTasks() 218 */ 219 public SpoolerTask[] getOpenTasks() { 220 throw new PageRuntimeException(new ApplicationException("this method is no longer supported")); 221 } 222 223 /** 224 * @see railo.runtime.spooler.SpoolerEngine#getClosedTasks() 225 */ 226 public SpoolerTask[] getClosedTasks() { 227 throw new PageRuntimeException(new ApplicationException("this method is no longer supported")); 228 } 229 230 // FUTURE add to interface 231 public Query getOpenTasksAsQuery(int startrow, int maxrow) throws PageException { 232 return getTasksAsQuery(createQuery(),openDirectory,startrow, maxrow); 233 } 234 235 public Query getClosedTasksAsQuery(int startrow, int maxrow) throws PageException { 236 return getTasksAsQuery(createQuery(),closedDirectory,startrow, maxrow); 237 } 238 239 public Query getAllTasksAsQuery(int startrow, int maxrow) throws PageException { 240 Query query = createQuery(); 241 //print.o(startrow+":"+maxrow); 242 getTasksAsQuery(query,openDirectory,startrow, maxrow); 243 int records = query.getRecordcount(); 244 if(maxrow<0) maxrow=Integer.MAX_VALUE; 245 // no open tasks 246 if(records==0) { 247 startrow-=getOpenTaskCount(); 248 if(startrow<1) startrow=1; 249 } 250 else { 251 startrow=1; 252 maxrow-=records; 253 } 254 if(maxrow>0)getTasksAsQuery(query,closedDirectory,startrow, maxrow); 255 return query; 256 } 257 258 public int getOpenTaskCount() { 259 return calculateSize(openDirectory); 260 } 261 262 public int getClosedTaskCount() { 263 return calculateSize(closedDirectory); 264 } 265 266 267 private Query getTasksAsQuery(Query qry,Resource dir, int startrow, int maxrow) throws PageException { 268 String[] children = dir.list(FILTER); 269 if(ArrayUtil.isEmpty(children)) return qry; 270 if(children.length<maxrow)maxrow=children.length; 271 SpoolerTask task; 272 273 int to=startrow+maxrow; 274 if(to>children.length)to=children.length; 275 if(startrow<1)startrow=1; 276 277 for(int i=startrow-1;i<to;i++){ 278 task = getTaskByName(dir, children[i]); 279 if(task!=null)addQueryRow(qry, task); 280 } 281 282 return qry; 283 } 284 285 private Query createQuery() throws DatabaseException { 286 String v="VARCHAR"; 287 String d="DATE"; 288 railo.runtime.type.Query qry=new QueryImpl( 289 new String[]{"type","name","detail","id","lastExecution","nextExecution","closed","tries","exceptions","triesmax"}, 290 new String[]{v,v,"object",v,d,d,"boolean","int","object","int"}, 291 0,"query"); 292 return qry; 293 } 294 295 private void addQueryRow(railo.runtime.type.Query qry, SpoolerTask task) throws PageException { 296 int row = qry.addRow(); 297 try{ 298 qry.setAt(KeyImpl.TYPE, row, task.getType()); 299 qry.setAt(KeyImpl.NAME, row, task.subject()); 300 qry.setAt(KeyImpl.DETAIL, row, task.detail()); 301 qry.setAt(KeyImpl.ID, row, task.getId()); 302 303 304 qry.setAt(LAST_EXECUTION, row,new DateTimeImpl(task.lastExecution(),true)); 305 qry.setAt(NEXT_EXECUTION, row,new DateTimeImpl(task.nextExecution(),true)); 306 qry.setAt(CLOSED, row,Caster.toBoolean(task.closed())); 307 qry.setAt(TRIES, row,Caster.toDouble(task.tries())); 308 qry.setAt(TRIES_MAX, row,Caster.toDouble(task.tries())); 309 qry.setAt(KeyImpl.EXCEPTIONS, row,translateTime(task.getExceptions())); 310 311 int triesMax=0; 312 ExecutionPlan[] plans = task.getPlans(); 313 for(int y=0;y<plans.length;y++) { 314 triesMax+=plans[y].getTries(); 315 } 316 qry.setAt(TRIES_MAX, row,Caster.toDouble(triesMax)); 317 } 318 catch(Throwable t){} 319 } 320 321 private Array translateTime(Array exp) { 322 exp=(Array) exp.duplicate(true); 323 Iterator it = exp.iterator(); 324 Struct sct; 325 while(it.hasNext()) { 326 sct=(Struct) it.next(); 327 sct.setEL(KeyImpl.TIME,new DateTimeImpl(Caster.toLongValue(sct.get(KeyImpl.TIME,null),0),true)); 328 } 329 return exp; 330 } 331 332 333 /* * 334 * @see railo.runtime.spooler.SpoolerEngine#getOpenTasks() 335 336 public SpoolerTask[] getOpenTasks() { 337 if(openTasks.size()==0) return new SpoolerTask[0]; 338 return (SpoolerTask[]) openTasks.toArray(new SpoolerTask[openTasks.size()]); 339 } */ 340 341 /* * 342 * @see railo.runtime.spooler.SpoolerEngine#getClosedTasks() 343 344 public SpoolerTask[] getClosedTasks() { 345 if(closedTasks.size()==0) return new SpoolerTask[0]; 346 return (SpoolerTask[]) closedTasks.toArray(new SpoolerTask[closedTasks.size()]); 347 }*/ 348 349 350 /*public static void list(SpoolerTask[] tasks) { 351 for(int i=0;i<tasks.length;i++) { 352 aprint.out(tasks[i].subject()); 353 aprint.out("- last exe:"+tasks[i].lastExecution()); 354 aprint.out("- tries:"+tasks[i].tries()); 355 } 356 }*/ 357 358 class SpoolerThread extends Thread { 359 360 private SpoolerEngineImpl engine; 361 private boolean sleeping; 362 private final int maxThreads; 363 364 public SpoolerThread(SpoolerEngineImpl engine) { 365 this.maxThreads=engine.getMaxThreads(); 366 this.engine=engine; 367 try{ 368 this.setPriority(MIN_PRIORITY); 369 } 370 // can throw security exceptions 371 catch(Throwable t){} 372 } 373 374 public void run() { 375 String[] taskNames; 376 //SpoolerTask[] tasks; 377 SpoolerTask task=null; 378 long nextExection; 379 ThreadLocalConfig.register(engine.config); 380 //ThreadLocalPageContext.register(engine.); 381 List<TaskThread> runningTasks=new ArrayList<TaskThread>(); 382 TaskThread tt; 383 int adds; 384 385 while(getOpenTaskCount()>0) { 386 adds=engine.adds(); 387 taskNames = openDirectory.list(FILTER); 388 //tasks=engine.getOpenTasks(); 389 nextExection=Long.MAX_VALUE; 390 for(int i=0;i<taskNames.length;i++) { 391 task=getTaskByName(openDirectory, taskNames[i]); 392 if(task==null) continue; 393 394 if(task.nextExecution()<=System.currentTimeMillis()) { 395 //print.o("- execute"); 396 tt=new TaskThread(engine,task); 397 tt.start(); 398 runningTasks.add(tt); 399 } 400 else if(task.nextExecution()<nextExection && 401 nextExection!=-1 && 402 !task.closed()) 403 nextExection=task.nextExecution(); 404 nextExection=joinTasks(runningTasks,maxThreads,nextExection); 405 } 406 407 nextExection=joinTasks(runningTasks,0,nextExection); 408 if(adds!=engine.adds()) continue; 409 410 if(nextExection==Long.MAX_VALUE)break; 411 long sleep = nextExection-System.currentTimeMillis(); 412 413 //print.o("sleep:"+sleep+">"+(sleep/1000)); 414 if(sleep>0)doWait(sleep); 415 416 //if(sleep<0)break; 417 } 418 //print.o("end:"+getOpenTaskCount()); 419 } 420 421 private long joinTasks(List<TaskThread> runningTasks, int maxThreads,long nextExection) { 422 if(runningTasks.size()>=maxThreads){ 423 Iterator<TaskThread> it = runningTasks.iterator(); 424 TaskThread tt; 425 SpoolerTask task; 426 while(it.hasNext()){ 427 tt = it.next(); 428 SystemUtil.join(tt); 429 task = tt.getTask(); 430 431 if(task!=null && task.nextExecution()!=-1 && task.nextExecution()<nextExection && !task.closed()) { 432 nextExection=task.nextExecution(); 433 } 434 } 435 runningTasks.clear(); 436 } 437 return nextExection; 438 } 439 440 private void doWait(long sleep) { 441 //long start=System.currentTimeMillis(); 442 try { 443 sleeping=true; 444 synchronized (this) { 445 wait(sleep); 446 } 447 448 } catch (Throwable t) { 449 // 450 } 451 finally { 452 sleeping=false; 453 } 454 //print.out(sleep+":"+(System.currentTimeMillis()-start)); 455 } 456 457 } 458 459 460 class TaskThread extends Thread { 461 462 private SpoolerEngineImpl engine; 463 private SpoolerTask task; 464 465 public TaskThread(SpoolerEngineImpl engine,SpoolerTask task) { 466 this.engine=engine; 467 this.task=task; 468 } 469 470 public SpoolerTask getTask() { 471 return task; 472 } 473 474 public void run() { 475 ThreadLocalConfig.register(engine.config); 476 engine.execute(task); 477 ThreadLocalConfig.release(); 478 479 } 480 } 481 482 483 /** 484 * remove that task from Spooler 485 * @param task 486 */ 487 public void remove(SpoolerTask task) { 488 unstore(task); 489 //if(!openTasks.remove(task))closedTasks.remove(task); 490 } 491 492 public void removeAll() { 493 ResourceUtil.removeChildrenEL(openDirectory); 494 ResourceUtil.removeChildrenEL(closedDirectory); 495 SystemUtil.sleep(100); 496 ResourceUtil.removeChildrenEL(openDirectory); 497 ResourceUtil.removeChildrenEL(closedDirectory); 498 } 499 500 501 /* * 502 * @see railo.runtime.spooler.SpoolerEngine#hasAdds() 503 */ 504 public int adds() { 505 //return openTasks.size()>0; 506 return add; 507 } 508 509 /* * 510 * @see railo.runtime.spooler.SpoolerEngine#resetAdds() 511 * / 512 public void resetAdds() { 513 add=false; 514 }*/ 515 516 /** 517 * @see railo.runtime.spooler.SpoolerEngine#remove(java.lang.String) 518 */ 519 public void remove(String id) { 520 SpoolerTask task = getTaskById(openDirectory,id); 521 if(task==null)task=getTaskById(closedDirectory,id); 522 if(task!=null)remove(task); 523 } 524 525 /*private SpoolerTask getTaskById(SpoolerTask[] tasks, String id) { 526 for(int i=0;i<tasks.length;i++) { 527 if(tasks[i].getId().equals(id)) { 528 return tasks[i]; 529 } 530 } 531 return null; 532 }*/ 533 534 /** 535 * execute task by id and return eror throwd by task 536 * @param id 537 * @throws SpoolerException 538 */ 539 public PageException execute(String id) { 540 SpoolerTask task = getTaskById(openDirectory,id); 541 if(task==null)task=getTaskById(closedDirectory,id); 542 if(task!=null){ 543 return execute(task); 544 } 545 return null; 546 } 547 548 public PageException execute(SpoolerTask task) { 549 //task.closed(); 550 try { 551 ((SpoolerTaskSupport)task)._execute(config); 552 //if(task.closed())closedTasks.remove(task); 553 //else openTasks.remove(task); 554 555 unstore(task); 556 log.info("remote-client", task.subject()); 557 task.setLastExecution(System.currentTimeMillis()); 558 task.setNextExecution(-1); 559 560 task.setClosed(true); 561 task=null; 562 } 563 catch(Throwable t) { 564 task.setLastExecution(System.currentTimeMillis()); 565 task.setNextExecution(calculateNextExecution(task)); 566 log.error("remote-client", task.subject()+":"+t.getMessage()); 567 if(task.nextExecution()==-1) { 568 //openTasks.remove(task); 569 //if(!closedTasks.contains(task))closedTasks.add(task); 570 unstore(task); 571 task.setClosed(true); 572 store(task); 573 task=null; 574 } 575 else 576 store(task); 577 578 return Caster.toPageException(t); 579 } 580 return null; 581 } 582 583 /** 584 * @see railo.runtime.spooler.SpoolerEngine#setLabel(java.lang.String) 585 */ 586 public void setLabel(String label) { 587 this.label = label; 588 } 589 590 /** 591 * @see railo.runtime.spooler.SpoolerEngine#setPersisDirectory(railo.commons.io.res.Resource) 592 */ 593 public void setPersisDirectory(Resource persisDirectory) { 594 this.persisDirectory = persisDirectory; 595 } 596 597 /** 598 * @see railo.runtime.spooler.SpoolerEngine#setLog(railo.commons.io.log.Log) 599 */ 600 public void setLog(Log log) { 601 this.log = log; 602 } 603 604 /** 605 * @see railo.runtime.spooler.SpoolerEngine#setConfig(railo.runtime.config.Config) 606 */ 607 public void setConfig(Config config) { 608 this.config = config; 609 } 610 611 } 612 613 class TaskFileFilter implements ResourceNameFilter { 614 615 public boolean accept(Resource parent, String name) { 616 return name!=null && name.endsWith(".tsk"); 617 } 618 619 }