001 package railo.runtime.spooler; 002 003 import java.io.IOException; 004 import java.io.InputStream; 005 import java.io.ObjectInputStream; 006 import java.io.ObjectOutputStream; 007 import java.util.ArrayList; 008 import java.util.Iterator; 009 import java.util.List; 010 011 import railo.commons.io.IOUtil; 012 import railo.commons.io.SystemUtil; 013 import railo.commons.io.log.Log; 014 import railo.commons.io.res.Resource; 015 import railo.commons.io.res.filter.ResourceNameFilter; 016 import railo.commons.io.res.util.ResourceUtil; 017 import railo.commons.lang.StringUtil; 018 import railo.runtime.config.Config; 019 import railo.runtime.engine.ThreadLocalConfig; 020 import railo.runtime.exp.DatabaseException; 021 import railo.runtime.exp.PageException; 022 import railo.runtime.op.Caster; 023 import railo.runtime.op.Duplicator; 024 import railo.runtime.type.Array; 025 import railo.runtime.type.Collection; 026 import railo.runtime.type.KeyImpl; 027 import railo.runtime.type.Query; 028 import railo.runtime.type.QueryImpl; 029 import railo.runtime.type.Struct; 030 import railo.runtime.type.dt.DateTimeImpl; 031 import railo.runtime.type.util.ArrayUtil; 032 import railo.runtime.type.util.KeyConstants; 033 034 public class SpoolerEngineImpl implements SpoolerEngine { 035 036 private static final TaskFileFilter FILTER=new TaskFileFilter(); 037 038 private static final Collection.Key LAST_EXECUTION = KeyImpl.intern("lastExecution"); 039 private static final Collection.Key NEXT_EXECUTION = KeyImpl.intern("nextExecution"); 040 041 private static final Collection.Key CLOSED = KeyImpl.intern("closed"); 042 private static final Collection.Key TRIES = KeyImpl.intern("tries"); 043 private static final Collection.Key TRIES_MAX = KeyImpl.intern("triesmax"); 044 045 046 private String label; 047 048 049 //private LinkedList<SpoolerTask> openTaskss=new LinkedList<SpoolerTask>(); 050 //private LinkedList<SpoolerTask> closedTasks=new LinkedList<SpoolerTask>(); 051 private SpoolerThread thread; 052 //private ExecutionPlan[] plans; 053 private Resource persisDirectory; 054 private long count=0; 055 private Log log; 056 private Config config; 057 private int add=0; 058 059 060 private Resource closedDirectory; 061 private Resource openDirectory; 062 063 private int maxThreads; 064 065 public SpoolerEngineImpl(Config config,Resource persisDirectory,String label, Log log, int maxThreads) { 066 this.config=config; 067 this.persisDirectory=persisDirectory; 068 069 closedDirectory = persisDirectory.getRealResource("closed"); 070 openDirectory = persisDirectory.getRealResource("open"); 071 //calculateSize(); 072 073 074 this.maxThreads=maxThreads; 075 this.label=label; 076 this.log=log; 077 //print.ds(persisDirectory.getAbsolutePath()); 078 //load(); 079 if(getOpenTaskCount()>0)start(); 080 } 081 082 /*private void calculateSize() { 083 closedCount=calculateSize(closedDirectory); 084 openCount=calculateSize(openDirectory); 085 }*/ 086 087 /** 088 * @return the maxThreads 089 */ 090 public int getMaxThreads() { 091 return maxThreads; 092 } 093 094 private int calculateSize(Resource res) { 095 return ResourceUtil.directrySize(res,FILTER); 096 } 097 098 @Override 099 public synchronized void add(SpoolerTask task) { 100 //openTasks.add(task); 101 add++; 102 task.setNextExecution(System.currentTimeMillis()); 103 task.setId(createId(task)); 104 store(task); 105 start(); 106 } 107 108 109 private void start() { 110 if(thread==null || !thread.isAlive()) { 111 thread=new SpoolerThread(this); 112 thread.setPriority(Thread.MIN_PRIORITY); 113 thread.start(); 114 } 115 else if(thread.sleeping) { 116 thread.interrupt(); 117 } 118 //else print.out("- existing"); 119 } 120 121 @Override 122 public String getLabel() { 123 return label; 124 } 125 126 127 128 private SpoolerTask getTaskById(Resource dir,String id) { 129 return getTask(dir.getRealResource(id+".tsk"),null); 130 } 131 132 private SpoolerTask getTaskByName(Resource dir,String name) { 133 return getTask(dir.getRealResource(name),null); 134 } 135 136 private SpoolerTask getTask(Resource res, SpoolerTask defaultValue) { 137 InputStream is = null; 138 ObjectInputStream ois = null; 139 140 SpoolerTask task=defaultValue; 141 try { 142 is = res.getInputStream(); 143 ois = new ObjectInputStream(is); 144 task = (SpoolerTask) ois.readObject(); 145 } 146 catch (Throwable t) {//t.printStackTrace(); 147 IOUtil.closeEL(is); 148 IOUtil.closeEL(ois); 149 res.delete(); 150 } 151 IOUtil.closeEL(is); 152 IOUtil.closeEL(ois); 153 return task; 154 } 155 156 private void store(SpoolerTask task) { 157 ObjectOutputStream oos=null; 158 Resource persis = getFile(task); 159 if(persis.exists()) persis.delete(); 160 try { 161 oos = new ObjectOutputStream(persis.getOutputStream()); 162 oos.writeObject(task); 163 } 164 catch (IOException e) {} 165 finally { 166 IOUtil.closeEL(oos); 167 } 168 } 169 170 private void unstore(SpoolerTask task) { 171 Resource persis = getFile(task); 172 boolean exists=persis.exists(); 173 if(exists) persis.delete(); 174 } 175 private Resource getFile(SpoolerTask task) { 176 Resource dir = persisDirectory.getRealResource(task.closed()?"closed":"open"); 177 dir.mkdirs(); 178 return dir.getRealResource(task.getId()+".tsk"); 179 } 180 181 private String createId(SpoolerTask task) { 182 Resource dir = persisDirectory.getRealResource(task.closed()?"closed":"open"); 183 dir.mkdirs(); 184 185 String id=null; 186 do{ 187 id=StringUtil.addZeros(++count, 8); 188 }while(dir.getRealResource(id+".tsk").exists()); 189 return id; 190 } 191 192 public long calculateNextExecution(SpoolerTask task) { 193 int _tries=0; 194 ExecutionPlan plan=null; 195 ExecutionPlan[] plans=task.getPlans(); 196 197 for(int i=0;i<plans.length;i++) { 198 _tries+=plans[i].getTries(); 199 if(_tries>task.tries()) { 200 plan=plans[i]; 201 break; 202 } 203 } 204 if(plan==null)return -1; 205 return task.lastExecution()+(plan.getIntervall()*1000); 206 } 207 208 public Query getOpenTasksAsQuery(int startrow, int maxrow) throws PageException { 209 return getTasksAsQuery(createQuery(),openDirectory,startrow, maxrow); 210 } 211 212 public Query getClosedTasksAsQuery(int startrow, int maxrow) throws PageException { 213 return getTasksAsQuery(createQuery(),closedDirectory,startrow, maxrow); 214 } 215 216 public Query getAllTasksAsQuery(int startrow, int maxrow) throws PageException { 217 Query query = createQuery(); 218 //print.o(startrow+":"+maxrow); 219 getTasksAsQuery(query,openDirectory,startrow, maxrow); 220 int records = query.getRecordcount(); 221 if(maxrow<0) maxrow=Integer.MAX_VALUE; 222 // no open tasks 223 if(records==0) { 224 startrow-=getOpenTaskCount(); 225 if(startrow<1) startrow=1; 226 } 227 else { 228 startrow=1; 229 maxrow-=records; 230 } 231 if(maxrow>0)getTasksAsQuery(query,closedDirectory,startrow, maxrow); 232 return query; 233 } 234 235 public int getOpenTaskCount() { 236 return calculateSize(openDirectory); 237 } 238 239 public int getClosedTaskCount() { 240 return calculateSize(closedDirectory); 241 } 242 243 244 private Query getTasksAsQuery(Query qry,Resource dir, int startrow, int maxrow) { 245 String[] children = dir.list(FILTER); 246 if(ArrayUtil.isEmpty(children)) return qry; 247 if(children.length<maxrow)maxrow=children.length; 248 SpoolerTask task; 249 250 int to=startrow+maxrow; 251 if(to>children.length)to=children.length; 252 if(startrow<1)startrow=1; 253 254 for(int i=startrow-1;i<to;i++){ 255 task = getTaskByName(dir, children[i]); 256 if(task!=null)addQueryRow(qry, task); 257 } 258 259 return qry; 260 } 261 262 private Query createQuery() throws DatabaseException { 263 String v="VARCHAR"; 264 String d="DATE"; 265 railo.runtime.type.Query qry=new QueryImpl( 266 new String[]{"type","name","detail","id","lastExecution","nextExecution","closed","tries","exceptions","triesmax"}, 267 new String[]{v,v,"object",v,d,d,"boolean","int","object","int"}, 268 0,"query"); 269 return qry; 270 } 271 272 private void addQueryRow(railo.runtime.type.Query qry, SpoolerTask task) { 273 int row = qry.addRow(); 274 try{ 275 qry.setAt(KeyConstants._type, row, task.getType()); 276 qry.setAt(KeyConstants._name, row, task.subject()); 277 qry.setAt(KeyConstants._detail, row, task.detail()); 278 qry.setAt(KeyConstants._id, row, task.getId()); 279 280 281 qry.setAt(LAST_EXECUTION, row,new DateTimeImpl(task.lastExecution(),true)); 282 qry.setAt(NEXT_EXECUTION, row,new DateTimeImpl(task.nextExecution(),true)); 283 qry.setAt(CLOSED, row,Caster.toBoolean(task.closed())); 284 qry.setAt(TRIES, row,Caster.toDouble(task.tries())); 285 qry.setAt(TRIES_MAX, row,Caster.toDouble(task.tries())); 286 qry.setAt(KeyConstants._exceptions, row,translateTime(task.getExceptions())); 287 288 int triesMax=0; 289 ExecutionPlan[] plans = task.getPlans(); 290 for(int y=0;y<plans.length;y++) { 291 triesMax+=plans[y].getTries(); 292 } 293 qry.setAt(TRIES_MAX, row,Caster.toDouble(triesMax)); 294 } 295 catch(Throwable t){} 296 } 297 298 private Array translateTime(Array exp) { 299 exp=(Array) Duplicator.duplicate(exp,true); 300 Iterator<Object> it = exp.valueIterator(); 301 Struct sct; 302 while(it.hasNext()) { 303 sct=(Struct) it.next(); 304 sct.setEL(KeyConstants._time,new DateTimeImpl(Caster.toLongValue(sct.get(KeyConstants._time,null),0),true)); 305 } 306 return exp; 307 } 308 309 class SpoolerThread extends Thread { 310 311 private SpoolerEngineImpl engine; 312 private boolean sleeping; 313 private final int maxThreads; 314 315 public SpoolerThread(SpoolerEngineImpl engine) { 316 this.maxThreads=engine.getMaxThreads(); 317 this.engine=engine; 318 try{ 319 this.setPriority(MIN_PRIORITY); 320 } 321 // can throw security exceptions 322 catch(Throwable t){} 323 } 324 325 public void run() { 326 String[] taskNames; 327 //SpoolerTask[] tasks; 328 SpoolerTask task=null; 329 long nextExection; 330 ThreadLocalConfig.register(engine.config); 331 //ThreadLocalPageContext.register(engine.); 332 List<TaskThread> runningTasks=new ArrayList<TaskThread>(); 333 TaskThread tt; 334 int adds; 335 336 while(getOpenTaskCount()>0) { 337 adds=engine.adds(); 338 taskNames = openDirectory.list(FILTER); 339 //tasks=engine.getOpenTasks(); 340 nextExection=Long.MAX_VALUE; 341 for(int i=0;i<taskNames.length;i++) { 342 task=getTaskByName(openDirectory, taskNames[i]); 343 if(task==null) continue; 344 345 if(task.nextExecution()<=System.currentTimeMillis()) { 346 //print.o("- execute"); 347 tt=new TaskThread(engine,task); 348 tt.start(); 349 runningTasks.add(tt); 350 } 351 else if(task.nextExecution()<nextExection && 352 nextExection!=-1 && 353 !task.closed()) 354 nextExection=task.nextExecution(); 355 nextExection=joinTasks(runningTasks,maxThreads,nextExection); 356 } 357 358 nextExection=joinTasks(runningTasks,0,nextExection); 359 if(adds!=engine.adds()) continue; 360 361 if(nextExection==Long.MAX_VALUE)break; 362 long sleep = nextExection-System.currentTimeMillis(); 363 364 //print.o("sleep:"+sleep+">"+(sleep/1000)); 365 if(sleep>0)doWait(sleep); 366 367 //if(sleep<0)break; 368 } 369 //print.o("end:"+getOpenTaskCount()); 370 } 371 372 private long joinTasks(List<TaskThread> runningTasks, int maxThreads,long nextExection) { 373 if(runningTasks.size()>=maxThreads){ 374 Iterator<TaskThread> it = runningTasks.iterator(); 375 TaskThread tt; 376 SpoolerTask task; 377 while(it.hasNext()){ 378 tt = it.next(); 379 SystemUtil.join(tt); 380 task = tt.getTask(); 381 382 if(task!=null && task.nextExecution()!=-1 && task.nextExecution()<nextExection && !task.closed()) { 383 nextExection=task.nextExecution(); 384 } 385 } 386 runningTasks.clear(); 387 } 388 return nextExection; 389 } 390 391 private void doWait(long sleep) { 392 //long start=System.currentTimeMillis(); 393 try { 394 sleeping=true; 395 synchronized (this) { 396 wait(sleep); 397 } 398 399 } catch (Throwable t) { 400 // 401 } 402 finally { 403 sleeping=false; 404 } 405 } 406 407 } 408 409 410 class TaskThread extends Thread { 411 412 private SpoolerEngineImpl engine; 413 private SpoolerTask task; 414 415 public TaskThread(SpoolerEngineImpl engine,SpoolerTask task) { 416 this.engine=engine; 417 this.task=task; 418 } 419 420 public SpoolerTask getTask() { 421 return task; 422 } 423 424 public void run() { 425 ThreadLocalConfig.register(engine.config); 426 engine.execute(task); 427 ThreadLocalConfig.release(); 428 429 } 430 } 431 432 433 /** 434 * remove that task from Spooler 435 * @param task 436 */ 437 public void remove(SpoolerTask task) { 438 unstore(task); 439 //if(!openTasks.remove(task))closedTasks.remove(task); 440 } 441 442 public void removeAll() { 443 ResourceUtil.removeChildrenEL(openDirectory); 444 ResourceUtil.removeChildrenEL(closedDirectory); 445 SystemUtil.sleep(100); 446 ResourceUtil.removeChildrenEL(openDirectory); 447 ResourceUtil.removeChildrenEL(closedDirectory); 448 } 449 450 public int adds() { 451 //return openTasks.size()>0; 452 return add; 453 } 454 455 @Override 456 public void remove(String id) { 457 SpoolerTask task = getTaskById(openDirectory,id); 458 if(task==null)task=getTaskById(closedDirectory,id); 459 if(task!=null)remove(task); 460 } 461 462 /*private SpoolerTask getTaskById(SpoolerTask[] tasks, String id) { 463 for(int i=0;i<tasks.length;i++) { 464 if(tasks[i].getId().equals(id)) { 465 return tasks[i]; 466 } 467 } 468 return null; 469 }*/ 470 471 /** 472 * execute task by id and return eror throwd by task 473 * @param id 474 * @throws SpoolerException 475 */ 476 public PageException execute(String id) { 477 SpoolerTask task = getTaskById(openDirectory,id); 478 if(task==null)task=getTaskById(closedDirectory,id); 479 if(task!=null){ 480 return execute(task); 481 } 482 return null; 483 } 484 485 public PageException execute(SpoolerTask task) { 486 //task.closed(); 487 try { 488 ((SpoolerTaskSupport)task)._execute(config); 489 //if(task.closed())closedTasks.remove(task); 490 //else openTasks.remove(task); 491 492 unstore(task); 493 log.info("remote-client", task.subject()); 494 task.setLastExecution(System.currentTimeMillis()); 495 task.setNextExecution(-1); 496 497 task.setClosed(true); 498 task=null; 499 } 500 catch(Throwable t) { 501 task.setLastExecution(System.currentTimeMillis()); 502 task.setNextExecution(calculateNextExecution(task)); 503 log.error("remote-client", task.subject()+":"+t.getMessage()); 504 if(task.nextExecution()==-1) { 505 //openTasks.remove(task); 506 //if(!closedTasks.contains(task))closedTasks.add(task); 507 unstore(task); 508 task.setClosed(true); 509 store(task); 510 task=null; 511 } 512 else 513 store(task); 514 515 return Caster.toPageException(t); 516 } 517 return null; 518 } 519 520 public void setLabel(String label) { 521 this.label = label; 522 } 523 524 public void setPersisDirectory(Resource persisDirectory) { 525 this.persisDirectory = persisDirectory; 526 } 527 528 public void setLog(Log log) { 529 this.log = log; 530 } 531 532 public void setConfig(Config config) { 533 this.config = config; 534 } 535 536 } 537 538 class TaskFileFilter implements ResourceNameFilter { 539 540 public boolean accept(Resource parent, String name) { 541 return name!=null && name.endsWith(".tsk"); 542 } 543 544 }