001    package railo.runtime.spooler;
002    
003    import java.io.IOException;
004    import java.io.InputStream;
005    import java.io.ObjectInputStream;
006    import java.io.ObjectOutputStream;
007    import java.util.ArrayList;
008    import java.util.Iterator;
009    import java.util.List;
010    
011    import railo.commons.io.IOUtil;
012    import railo.commons.io.SystemUtil;
013    import railo.commons.io.log.Log;
014    import railo.commons.io.res.Resource;
015    import railo.commons.io.res.filter.ResourceNameFilter;
016    import railo.commons.io.res.util.ResourceUtil;
017    import railo.commons.lang.StringUtil;
018    import railo.runtime.config.Config;
019    import railo.runtime.engine.ThreadLocalConfig;
020    import railo.runtime.exp.DatabaseException;
021    import railo.runtime.exp.PageException;
022    import railo.runtime.op.Caster;
023    import railo.runtime.op.Duplicator;
024    import railo.runtime.type.Array;
025    import railo.runtime.type.Collection;
026    import railo.runtime.type.KeyImpl;
027    import railo.runtime.type.Query;
028    import railo.runtime.type.QueryImpl;
029    import railo.runtime.type.Struct;
030    import railo.runtime.type.dt.DateTimeImpl;
031    import railo.runtime.type.util.ArrayUtil;
032    import railo.runtime.type.util.KeyConstants;
033    
034    public class SpoolerEngineImpl implements SpoolerEngine {
035            
036            private static final TaskFileFilter FILTER=new TaskFileFilter();
037    
038            private static final Collection.Key LAST_EXECUTION = KeyImpl.intern("lastExecution");
039            private static final Collection.Key NEXT_EXECUTION = KeyImpl.intern("nextExecution");
040            
041            private static final Collection.Key CLOSED = KeyImpl.intern("closed");
042            private static final Collection.Key TRIES = KeyImpl.intern("tries");
043            private static final Collection.Key TRIES_MAX = KeyImpl.intern("triesmax");
044    
045            
046            private String label;
047            
048    
049            //private LinkedList<SpoolerTask> openTaskss=new LinkedList<SpoolerTask>();
050            //private LinkedList<SpoolerTask> closedTasks=new LinkedList<SpoolerTask>();
051            private SpoolerThread thread;
052            //private ExecutionPlan[] plans;
053            private Resource persisDirectory;
054            private long count=0;
055            private Log log;
056            private Config config; 
057            private int add=0;
058    
059    
060            private Resource closedDirectory;
061            private Resource openDirectory;
062    
063            private int maxThreads;
064            
065            public SpoolerEngineImpl(Config config,Resource persisDirectory,String label, Log log, int maxThreads) {
066                    this.config=config;
067                    this.persisDirectory=persisDirectory;
068    
069                    closedDirectory = persisDirectory.getRealResource("closed");
070                    openDirectory = persisDirectory.getRealResource("open");
071                    //calculateSize();
072                    
073    
074                    this.maxThreads=maxThreads;
075                    this.label=label;
076                    this.log=log;
077                    //print.ds(persisDirectory.getAbsolutePath());
078                    //load();
079                    if(getOpenTaskCount()>0)start();
080            }
081    
082            /*private void calculateSize() {
083                    closedCount=calculateSize(closedDirectory);
084                    openCount=calculateSize(openDirectory);
085            }*/
086    
087            /**
088             * @return the maxThreads
089             */
090            public int getMaxThreads() {
091                    return maxThreads;
092            }
093    
094            private int calculateSize(Resource res) {
095                    return ResourceUtil.directrySize(res,FILTER);
096            }
097    
098            @Override
099            public synchronized void add(SpoolerTask task) {
100                    //openTasks.add(task);
101                    add++;
102                    task.setNextExecution(System.currentTimeMillis());
103                    task.setId(createId(task));
104                    store(task);
105                    start();
106            }
107    
108    
109            private void start() {
110                    if(thread==null || !thread.isAlive()) {
111                            thread=new SpoolerThread(this);
112                            thread.setPriority(Thread.MIN_PRIORITY);
113                            thread.start();
114                    }
115                    else if(thread.sleeping) {
116                            thread.interrupt();
117                    }
118                    //else print.out("- existing");
119            }
120    
121            @Override
122            public String getLabel() {
123                    return label;
124            }
125            
126            
127    
128            private SpoolerTask getTaskById(Resource dir,String id) {
129                    return getTask(dir.getRealResource(id+".tsk"),null);
130            }
131            
132            private SpoolerTask getTaskByName(Resource dir,String name) {
133                    return getTask(dir.getRealResource(name),null);
134            }
135    
136            private SpoolerTask getTask(Resource res, SpoolerTask defaultValue) {
137                    InputStream is = null;
138            ObjectInputStream ois = null;
139            
140            SpoolerTask task=defaultValue;
141                    try {
142                            is = res.getInputStream();
143                    ois = new ObjectInputStream(is);
144                    task = (SpoolerTask) ois.readObject();
145            } 
146            catch (Throwable t) {//t.printStackTrace();
147                    IOUtil.closeEL(is);
148                    IOUtil.closeEL(ois);
149                    res.delete();
150            }
151            IOUtil.closeEL(is);
152            IOUtil.closeEL(ois);
153                    return task;
154            }
155    
156            private void store(SpoolerTask task) {
157                    ObjectOutputStream oos=null;
158                    Resource persis = getFile(task);
159                    if(persis.exists()) persis.delete();
160            try {
161                    oos = new ObjectOutputStream(persis.getOutputStream());
162                    oos.writeObject(task);
163            } 
164            catch (IOException e) {}
165            finally {
166                    IOUtil.closeEL(oos);
167            }
168            }
169    
170            private void unstore(SpoolerTask task) {
171                    Resource persis = getFile(task);
172                    boolean exists=persis.exists();
173                    if(exists) persis.delete(); 
174            }
175            private Resource getFile(SpoolerTask task) {
176                    Resource dir = persisDirectory.getRealResource(task.closed()?"closed":"open");
177                    dir.mkdirs();
178                    return dir.getRealResource(task.getId()+".tsk");
179            }
180            
181            private String createId(SpoolerTask task) {
182                    Resource dir = persisDirectory.getRealResource(task.closed()?"closed":"open");
183                    dir.mkdirs();
184                    
185                    String id=null;
186                    do{
187                            id=StringUtil.addZeros(++count, 8);
188                    }while(dir.getRealResource(id+".tsk").exists());
189                    return id;
190            }
191    
192            public long calculateNextExecution(SpoolerTask task) {
193                    int _tries=0;
194                    ExecutionPlan plan=null;
195                    ExecutionPlan[] plans=task.getPlans();
196                    
197                    for(int i=0;i<plans.length;i++) {
198                            _tries+=plans[i].getTries();
199                            if(_tries>task.tries()) {
200                                    plan=plans[i];
201                                    break;
202                            }
203                    }
204                    if(plan==null)return -1;
205                    return task.lastExecution()+(plan.getIntervall()*1000);
206            }
207            
208            public Query getOpenTasksAsQuery(int startrow, int maxrow) throws PageException {
209                    return getTasksAsQuery(createQuery(),openDirectory,startrow, maxrow);
210            }
211    
212            public Query getClosedTasksAsQuery(int startrow, int maxrow) throws PageException {
213                    return getTasksAsQuery(createQuery(),closedDirectory,startrow, maxrow);
214            }
215    
216            public Query getAllTasksAsQuery(int startrow, int maxrow) throws PageException {
217                    Query query = createQuery();
218                    //print.o(startrow+":"+maxrow);
219                    getTasksAsQuery(query,openDirectory,startrow, maxrow);
220                    int records = query.getRecordcount();
221                    if(maxrow<0) maxrow=Integer.MAX_VALUE;
222                    // no open tasks
223                    if(records==0) {
224                            startrow-=getOpenTaskCount();
225                            if(startrow<1) startrow=1;
226                    }
227                    else {
228                            startrow=1;
229                            maxrow-=records;
230                    }
231                    if(maxrow>0)getTasksAsQuery(query,closedDirectory,startrow, maxrow);
232                    return query;
233            }
234            
235            public int getOpenTaskCount() {
236                    return calculateSize(openDirectory);
237            }
238            
239            public int getClosedTaskCount() {
240                    return calculateSize(closedDirectory);
241            }
242            
243            
244            private Query getTasksAsQuery(Query qry,Resource dir, int startrow, int maxrow) {
245                    String[] children = dir.list(FILTER);
246                    if(ArrayUtil.isEmpty(children)) return qry;
247                    if(children.length<maxrow)maxrow=children.length;
248                    SpoolerTask task;
249                    
250                    int to=startrow+maxrow;
251                    if(to>children.length)to=children.length;
252                    if(startrow<1)startrow=1;
253                    
254                    for(int i=startrow-1;i<to;i++){
255                            task = getTaskByName(dir, children[i]);
256                            if(task!=null)addQueryRow(qry, task);
257                    }
258                    
259                    return qry;
260            }
261            
262            private Query createQuery() throws DatabaseException {
263                    String v="VARCHAR";
264                    String d="DATE";
265                    railo.runtime.type.Query qry=new QueryImpl(
266                                    new String[]{"type","name","detail","id","lastExecution","nextExecution","closed","tries","exceptions","triesmax"},
267                                    new String[]{v,v,"object",v,d,d,"boolean","int","object","int"},
268                                    0,"query");
269                    return qry;
270            }
271            
272            private void addQueryRow(railo.runtime.type.Query qry, SpoolerTask task) {
273            int row = qry.addRow();
274                    try{
275                            qry.setAt(KeyConstants._type, row, task.getType());
276                            qry.setAt(KeyConstants._name, row, task.subject());
277                            qry.setAt(KeyConstants._detail, row, task.detail());
278                            qry.setAt(KeyConstants._id, row, task.getId());
279    
280                            
281                            qry.setAt(LAST_EXECUTION, row,new DateTimeImpl(task.lastExecution(),true));
282                            qry.setAt(NEXT_EXECUTION, row,new DateTimeImpl(task.nextExecution(),true));
283                            qry.setAt(CLOSED, row,Caster.toBoolean(task.closed()));
284                            qry.setAt(TRIES, row,Caster.toDouble(task.tries()));
285                            qry.setAt(TRIES_MAX, row,Caster.toDouble(task.tries()));
286                            qry.setAt(KeyConstants._exceptions, row,translateTime(task.getExceptions()));
287                            
288                            int triesMax=0;
289                            ExecutionPlan[] plans = task.getPlans();
290                            for(int y=0;y<plans.length;y++) {
291                                    triesMax+=plans[y].getTries();
292                            }
293                            qry.setAt(TRIES_MAX, row,Caster.toDouble(triesMax));
294                    }
295                    catch(Throwable t){}
296            }
297            
298            private Array translateTime(Array exp) {
299                    exp=(Array) Duplicator.duplicate(exp,true);
300                    Iterator<Object> it = exp.valueIterator();
301                    Struct sct;
302                    while(it.hasNext()) {
303                            sct=(Struct) it.next();
304                            sct.setEL(KeyConstants._time,new DateTimeImpl(Caster.toLongValue(sct.get(KeyConstants._time,null),0),true));
305                    }
306                    return exp;
307            }
308    
309            class SpoolerThread extends Thread {
310    
311                    private SpoolerEngineImpl engine;
312                    private boolean sleeping;
313                    private final int maxThreads;
314    
315                    public SpoolerThread(SpoolerEngineImpl engine) {
316                            this.maxThreads=engine.getMaxThreads();
317                            this.engine=engine;
318                            try{
319                                    this.setPriority(MIN_PRIORITY);
320                            }
321                            // can throw security exceptions
322                            catch(Throwable t){}
323                    }
324                    
325                    public void run() {
326                            String[] taskNames;
327                            //SpoolerTask[] tasks;
328                            SpoolerTask task=null;
329                            long nextExection;
330                            ThreadLocalConfig.register(engine.config);
331                            //ThreadLocalPageContext.register(engine.);
332                            List<TaskThread> runningTasks=new ArrayList<TaskThread>();
333                            TaskThread tt;
334                            int adds;
335                            
336                            while(getOpenTaskCount()>0) {
337                                    adds=engine.adds();
338                                    taskNames = openDirectory.list(FILTER);
339                                    //tasks=engine.getOpenTasks();
340                                    nextExection=Long.MAX_VALUE;
341                                    for(int i=0;i<taskNames.length;i++) {
342                                            task=getTaskByName(openDirectory, taskNames[i]);
343                                            if(task==null) continue;
344                                            
345                                            if(task.nextExecution()<=System.currentTimeMillis()) {
346                                                    //print.o("- execute");
347                                                    tt=new TaskThread(engine,task);
348                                                    tt.start();
349                                                    runningTasks.add(tt);
350                                            }
351                                            else if(task.nextExecution()<nextExection && 
352                                                            nextExection!=-1 && 
353                                                            !task.closed()) 
354                                                    nextExection=task.nextExecution();
355                                            nextExection=joinTasks(runningTasks,maxThreads,nextExection);
356                                    }
357                                    
358                                    nextExection=joinTasks(runningTasks,0,nextExection);
359                                    if(adds!=engine.adds()) continue;
360                                    
361                                    if(nextExection==Long.MAX_VALUE)break;
362                                    long sleep = nextExection-System.currentTimeMillis();
363                                    
364                                    //print.o("sleep:"+sleep+">"+(sleep/1000));
365                                    if(sleep>0)doWait(sleep);
366                                    
367                                    //if(sleep<0)break;
368                            }
369                            //print.o("end:"+getOpenTaskCount());
370                    }
371    
372                    private long joinTasks(List<TaskThread> runningTasks, int maxThreads,long nextExection) {
373                            if(runningTasks.size()>=maxThreads){
374                                    Iterator<TaskThread> it = runningTasks.iterator();
375                                    TaskThread tt;
376                                    SpoolerTask task;
377                                    while(it.hasNext()){
378                                            tt = it.next();
379                                            SystemUtil.join(tt);
380                                            task = tt.getTask();
381    
382                                            if(task!=null && task.nextExecution()!=-1 && task.nextExecution()<nextExection && !task.closed()) {
383                                                    nextExection=task.nextExecution();
384                                            }
385                                    }
386                                    runningTasks.clear();
387                            }
388                            return nextExection;
389                    }
390    
391                    private void doWait(long sleep) {
392                            //long start=System.currentTimeMillis();
393                            try {
394                                    sleeping=true;
395                                    synchronized (this) {
396                                            wait(sleep);
397                                    }
398                                    
399                            } catch (Throwable t) {
400                                    //
401                            }
402                            finally {
403                                    sleeping=false;
404                            }
405                    }
406                    
407            }
408            
409            
410            class TaskThread extends Thread {
411                    
412                    private SpoolerEngineImpl engine;
413                    private SpoolerTask task;
414    
415                    public TaskThread(SpoolerEngineImpl engine,SpoolerTask task) {
416                            this.engine=engine;
417                            this.task=task;
418                    }
419                    
420                    public SpoolerTask getTask() {
421                            return task;
422                    }
423    
424                    public void run() {
425                            ThreadLocalConfig.register(engine.config);
426                            engine.execute(task);
427                            ThreadLocalConfig.release();
428                            
429                    }
430            }
431            
432    
433            /**
434             * remove that task from Spooler
435             * @param task
436             */
437            public void remove(SpoolerTask task) {
438                    unstore(task);
439                    //if(!openTasks.remove(task))closedTasks.remove(task);
440            }
441            
442            public void removeAll() {
443                    ResourceUtil.removeChildrenEL(openDirectory);
444                    ResourceUtil.removeChildrenEL(closedDirectory);
445                    SystemUtil.sleep(100);
446                    ResourceUtil.removeChildrenEL(openDirectory);
447                    ResourceUtil.removeChildrenEL(closedDirectory);
448            }
449            
450            public int adds() {
451                    //return openTasks.size()>0;
452                    return add;
453            }    
454    
455            @Override
456            public void remove(String id) {
457                    SpoolerTask task = getTaskById(openDirectory,id);
458                    if(task==null)task=getTaskById(closedDirectory,id);
459                    if(task!=null)remove(task);
460            }
461    
462            /*private SpoolerTask getTaskById(SpoolerTask[] tasks, String id) {
463                    for(int i=0;i<tasks.length;i++) {
464                            if(tasks[i].getId().equals(id)) {
465                                    return tasks[i];
466                            }
467                    }
468                    return null;
469            }*/
470    
471            /**
472             * execute task by id and return eror throwd by task
473             * @param id
474             * @throws SpoolerException
475             */
476            public PageException execute(String id) {
477                    SpoolerTask task = getTaskById(openDirectory,id);
478                    if(task==null)task=getTaskById(closedDirectory,id);
479                    if(task!=null){
480                            return execute(task);
481                    }
482                    return null;
483            }
484            
485            public PageException execute(SpoolerTask task) {
486                    //task.closed();
487                    try {
488                            ((SpoolerTaskSupport)task)._execute(config);
489                            //if(task.closed())closedTasks.remove(task);
490                            //else openTasks.remove(task);
491                            
492                            unstore(task);
493                            log.info("remote-client", task.subject());
494                            task.setLastExecution(System.currentTimeMillis());
495                            task.setNextExecution(-1);
496                            
497                            task.setClosed(true);
498                            task=null;
499                    } 
500                    catch(Throwable t) {
501                            task.setLastExecution(System.currentTimeMillis());
502                            task.setNextExecution(calculateNextExecution(task));
503                            log.error("remote-client", task.subject()+":"+t.getMessage());
504                            if(task.nextExecution()==-1) {
505                                    //openTasks.remove(task);
506                                    //if(!closedTasks.contains(task))closedTasks.add(task);
507                                    unstore(task);
508                                    task.setClosed(true);
509                                    store(task);
510                                    task=null;
511                            }
512                            else 
513                                    store(task);
514                            
515                            return Caster.toPageException(t);
516                    }
517                    return null;
518            }
519    
520            public void setLabel(String label) {
521                    this.label = label;
522            }
523    
524            public void setPersisDirectory(Resource persisDirectory) {
525                    this.persisDirectory = persisDirectory;
526            }
527    
528            public void setLog(Log log) {
529                    this.log = log;
530            }
531    
532            public void setConfig(Config config) {
533                    this.config = config;
534            }
535            
536    }
537            
538    class TaskFileFilter implements ResourceNameFilter {
539    
540            public boolean accept(Resource parent, String name) {
541                    return name!=null && name.endsWith(".tsk");
542            }
543            
544    }