001    package railo.runtime.spooler;
002    
003    import java.io.IOException;
004    import java.io.InputStream;
005    import java.io.ObjectInputStream;
006    import java.io.ObjectOutputStream;
007    import java.util.ArrayList;
008    import java.util.Iterator;
009    import java.util.List;
010    
011    import railo.commons.io.IOUtil;
012    import railo.commons.io.SystemUtil;
013    import railo.commons.io.log.Log;
014    import railo.commons.io.res.Resource;
015    import railo.commons.io.res.filter.ResourceNameFilter;
016    import railo.commons.io.res.util.ResourceUtil;
017    import railo.commons.lang.StringUtil;
018    import railo.runtime.config.Config;
019    import railo.runtime.engine.ThreadLocalConfig;
020    import railo.runtime.exp.ApplicationException;
021    import railo.runtime.exp.DatabaseException;
022    import railo.runtime.exp.PageException;
023    import railo.runtime.exp.PageRuntimeException;
024    import railo.runtime.op.Caster;
025    import railo.runtime.type.Array;
026    import railo.runtime.type.Collection;
027    import railo.runtime.type.KeyImpl;
028    import railo.runtime.type.Query;
029    import railo.runtime.type.QueryImpl;
030    import railo.runtime.type.Struct;
031    import railo.runtime.type.dt.DateTimeImpl;
032    import railo.runtime.type.util.ArrayUtil;
033    
034    public class SpoolerEngineImpl implements SpoolerEngine {
035            
036            private static final TaskFileFilter FILTER=new TaskFileFilter();
037    
038            private static final Collection.Key LAST_EXECUTION = KeyImpl.intern("lastExecution");
039            private static final Collection.Key NEXT_EXECUTION = KeyImpl.intern("nextExecution");
040            
041            private static final Collection.Key CLOSED = KeyImpl.intern("closed");
042            private static final Collection.Key TRIES = KeyImpl.intern("tries");
043            private static final Collection.Key TRIES_MAX = KeyImpl.intern("triesmax");
044    
045            
046            private String label;
047            
048    
049            //private LinkedList<SpoolerTask> openTaskss=new LinkedList<SpoolerTask>();
050            //private LinkedList<SpoolerTask> closedTasks=new LinkedList<SpoolerTask>();
051            private SpoolerThread thread;
052            //private ExecutionPlan[] plans;
053            private Resource persisDirectory;
054            private long count=0;
055            private Log log;
056            private Config config; 
057            private int add=0;
058    
059    
060            private Resource closedDirectory;
061            private Resource openDirectory;
062    
063            private int maxThreads;
064            
065            public SpoolerEngineImpl(Config config,Resource persisDirectory,String label, Log log, int maxThreads) throws IOException {
066                    this.config=config;
067                    this.persisDirectory=persisDirectory;
068    
069                    closedDirectory = persisDirectory.getRealResource("closed");
070                    openDirectory = persisDirectory.getRealResource("open");
071                    //calculateSize();
072                    
073    
074                    this.maxThreads=maxThreads;
075                    this.label=label;
076                    this.log=log;
077                    //print.ds(persisDirectory.getAbsolutePath());
078                    //load();
079                    if(getOpenTaskCount()>0)start();
080            }
081    
082            /*private void calculateSize() {
083                    closedCount=calculateSize(closedDirectory);
084                    openCount=calculateSize(openDirectory);
085            }*/
086    
087            /**
088             * @return the maxThreads
089             */
090            public int getMaxThreads() {
091                    return maxThreads;
092            }
093    
094            private int calculateSize(Resource res) {
095                    return ResourceUtil.directrySize(res,FILTER);
096            }
097    
098            /**
099             * @see railo.runtime.spooler.SpoolerEngine#add(railo.runtime.spooler.SpoolerTask)
100             */
101            public synchronized void add(SpoolerTask task) {
102                    //openTasks.add(task);
103                    add++;
104                    task.setNextExecution(System.currentTimeMillis());
105                    task.setId(createId(task));
106                    store(task);
107                    start();
108            }
109    
110    
111            private void start() {
112                    if(thread==null || !thread.isAlive()) {
113                            thread=new SpoolerThread(this);
114                            thread.setPriority(Thread.MIN_PRIORITY);
115                            thread.start();
116                    }
117                    else if(thread.sleeping) {
118                            thread.interrupt();
119                    }
120                    //else print.out("- existing");
121            }
122    
123            /**
124             * @see railo.runtime.spooler.SpoolerEngine#getLabel()
125             */
126            public String getLabel() {
127                    return label;
128            }
129            
130            
131    
132            private SpoolerTask getTaskById(Resource dir,String id) {
133                    return getTask(dir.getRealResource(id+".tsk"),null);
134            }
135            
136            private SpoolerTask getTaskByName(Resource dir,String name) {
137                    return getTask(dir.getRealResource(name),null);
138            }
139    
140            private SpoolerTask getTask(Resource res, SpoolerTask defaultValue) {
141                    InputStream is = null;
142            ObjectInputStream ois = null;
143            
144            SpoolerTask task=defaultValue;
145                    try {
146                            is = res.getInputStream();
147                    ois = new ObjectInputStream(is);
148                    task = (SpoolerTask) ois.readObject();
149            } 
150            catch (Throwable t) {//t.printStackTrace();
151                    IOUtil.closeEL(is);
152                    IOUtil.closeEL(ois);
153                    res.delete();
154            }
155            IOUtil.closeEL(is);
156            IOUtil.closeEL(ois);
157                    return task;
158            }
159    
160            private void store(SpoolerTask task) {
161                    ObjectOutputStream oos=null;
162                    Resource persis = getFile(task);
163                    if(persis.exists()) persis.delete();
164            try {
165                    oos = new ObjectOutputStream(persis.getOutputStream());
166                    oos.writeObject(task);
167            } 
168            catch (IOException e) {}
169            finally {
170                    IOUtil.closeEL(oos);
171            }
172            }
173    
174            private void unstore(SpoolerTask task) {
175                    Resource persis = getFile(task);
176                    boolean exists=persis.exists();
177                    if(exists) persis.delete(); 
178            }
179            private Resource getFile(SpoolerTask task) {
180                    Resource dir = persisDirectory.getRealResource(task.closed()?"closed":"open");
181                    dir.mkdirs();
182                    return dir.getRealResource(task.getId()+".tsk");
183            }
184            
185            private String createId(SpoolerTask task) {
186                    Resource dir = persisDirectory.getRealResource(task.closed()?"closed":"open");
187                    dir.mkdirs();
188                    
189                    String id=null;
190                    do{
191                            id=StringUtil.addZeros(++count, 8);
192                    }while(dir.getRealResource(id+".tsk").exists());
193                    return id;
194            }
195            
196    
197            /**
198             * @see railo.runtime.spooler.SpoolerEngine#calculateNextExecution(railo.runtime.spooler.SpoolerTask)
199             */
200            public long calculateNextExecution(SpoolerTask task) {
201                    int _tries=0;
202                    ExecutionPlan plan=null;
203                    ExecutionPlan[] plans=task.getPlans();
204                    
205                    for(int i=0;i<plans.length;i++) {
206                            _tries+=plans[i].getTries();
207                            if(_tries>task.tries()) {
208                                    plan=plans[i];
209                                    break;
210                            }
211                    }
212                    if(plan==null)return -1;
213                    return task.lastExecution()+(plan.getIntervall()*1000);
214            }
215    
216            /**
217             * @see railo.runtime.spooler.SpoolerEngine#getOpenTasks()
218             */
219            public SpoolerTask[] getOpenTasks() {
220                    throw new PageRuntimeException(new ApplicationException("this method is no longer supported"));
221            }
222            
223            /**
224             * @see railo.runtime.spooler.SpoolerEngine#getClosedTasks()
225             */
226            public SpoolerTask[] getClosedTasks() {
227                    throw new PageRuntimeException(new ApplicationException("this method is no longer supported"));
228            }
229            
230            // FUTURE add to interface
231            public Query getOpenTasksAsQuery(int startrow, int maxrow) throws PageException {
232                    return getTasksAsQuery(createQuery(),openDirectory,startrow, maxrow);
233            }
234    
235            public Query getClosedTasksAsQuery(int startrow, int maxrow) throws PageException {
236                    return getTasksAsQuery(createQuery(),closedDirectory,startrow, maxrow);
237            }
238    
239            public Query getAllTasksAsQuery(int startrow, int maxrow) throws PageException {
240                    Query query = createQuery();
241                    //print.o(startrow+":"+maxrow);
242                    getTasksAsQuery(query,openDirectory,startrow, maxrow);
243                    int records = query.getRecordcount();
244                    if(maxrow<0) maxrow=Integer.MAX_VALUE;
245                    // no open tasks
246                    if(records==0) {
247                            startrow-=getOpenTaskCount();
248                            if(startrow<1) startrow=1;
249                    }
250                    else {
251                            startrow=1;
252                            maxrow-=records;
253                    }
254                    if(maxrow>0)getTasksAsQuery(query,closedDirectory,startrow, maxrow);
255                    return query;
256            }
257            
258            public int getOpenTaskCount() {
259                    return calculateSize(openDirectory);
260            }
261            
262            public int getClosedTaskCount() {
263                    return calculateSize(closedDirectory);
264            }
265            
266            
267            private Query getTasksAsQuery(Query qry,Resource dir, int startrow, int maxrow) throws PageException {
268                    String[] children = dir.list(FILTER);
269                    if(ArrayUtil.isEmpty(children)) return qry;
270                    if(children.length<maxrow)maxrow=children.length;
271                    SpoolerTask task;
272                    
273                    int to=startrow+maxrow;
274                    if(to>children.length)to=children.length;
275                    if(startrow<1)startrow=1;
276                    
277                    for(int i=startrow-1;i<to;i++){
278                            task = getTaskByName(dir, children[i]);
279                            if(task!=null)addQueryRow(qry, task);
280                    }
281                    
282                    return qry;
283            }
284            
285            private Query createQuery() throws DatabaseException {
286                    String v="VARCHAR";
287                    String d="DATE";
288                    railo.runtime.type.Query qry=new QueryImpl(
289                                    new String[]{"type","name","detail","id","lastExecution","nextExecution","closed","tries","exceptions","triesmax"},
290                                    new String[]{v,v,"object",v,d,d,"boolean","int","object","int"},
291                                    0,"query");
292                    return qry;
293            }
294            
295            private void addQueryRow(railo.runtime.type.Query qry, SpoolerTask task) throws PageException {
296            int row = qry.addRow();
297                    try{
298                            qry.setAt(KeyImpl.TYPE, row, task.getType());
299                            qry.setAt(KeyImpl.NAME, row, task.subject());
300                            qry.setAt(KeyImpl.DETAIL, row, task.detail());
301                            qry.setAt(KeyImpl.ID, row, task.getId());
302    
303                            
304                            qry.setAt(LAST_EXECUTION, row,new DateTimeImpl(task.lastExecution(),true));
305                            qry.setAt(NEXT_EXECUTION, row,new DateTimeImpl(task.nextExecution(),true));
306                            qry.setAt(CLOSED, row,Caster.toBoolean(task.closed()));
307                            qry.setAt(TRIES, row,Caster.toDouble(task.tries()));
308                            qry.setAt(TRIES_MAX, row,Caster.toDouble(task.tries()));
309                            qry.setAt(KeyImpl.EXCEPTIONS, row,translateTime(task.getExceptions()));
310                            
311                            int triesMax=0;
312                            ExecutionPlan[] plans = task.getPlans();
313                            for(int y=0;y<plans.length;y++) {
314                                    triesMax+=plans[y].getTries();
315                            }
316                            qry.setAt(TRIES_MAX, row,Caster.toDouble(triesMax));
317                    }
318                    catch(Throwable t){}
319            }
320            
321            private Array translateTime(Array exp) {
322                    exp=(Array) exp.duplicate(true);
323                    Iterator it = exp.iterator();
324                    Struct sct;
325                    while(it.hasNext()) {
326                            sct=(Struct) it.next();
327                            sct.setEL(KeyImpl.TIME,new DateTimeImpl(Caster.toLongValue(sct.get(KeyImpl.TIME,null),0),true));
328                    }
329                    return exp;
330            }
331    
332    
333            /* *
334             * @see railo.runtime.spooler.SpoolerEngine#getOpenTasks()
335            
336            public SpoolerTask[] getOpenTasks() {
337                    if(openTasks.size()==0) return new SpoolerTask[0];
338                    return (SpoolerTask[]) openTasks.toArray(new SpoolerTask[openTasks.size()]);
339            } */
340            
341            /* *
342             * @see railo.runtime.spooler.SpoolerEngine#getClosedTasks()
343             
344            public SpoolerTask[] getClosedTasks() {
345                    if(closedTasks.size()==0) return new SpoolerTask[0];
346                    return (SpoolerTask[]) closedTasks.toArray(new SpoolerTask[closedTasks.size()]);
347            }*/
348            
349    
350            /*public static void list(SpoolerTask[] tasks) {
351                    for(int i=0;i<tasks.length;i++) {
352                            aprint.out(tasks[i].subject());
353                            aprint.out("- last exe:"+tasks[i].lastExecution());
354                            aprint.out("- tries:"+tasks[i].tries());
355                    }
356            }*/
357            
358            class SpoolerThread extends Thread {
359    
360                    private SpoolerEngineImpl engine;
361                    private boolean sleeping;
362                    private final int maxThreads;
363    
364                    public SpoolerThread(SpoolerEngineImpl engine) {
365                            this.maxThreads=engine.getMaxThreads();
366                            this.engine=engine;
367                            try{
368                                    this.setPriority(MIN_PRIORITY);
369                            }
370                            // can throw security exceptions
371                            catch(Throwable t){}
372                    }
373                    
374                    public void run() {
375                            String[] taskNames;
376                            //SpoolerTask[] tasks;
377                            SpoolerTask task=null;
378                            long nextExection;
379                            ThreadLocalConfig.register(engine.config);
380                            //ThreadLocalPageContext.register(engine.);
381                            List<TaskThread> runningTasks=new ArrayList<TaskThread>();
382                            TaskThread tt;
383                            int adds;
384                            
385                            while(getOpenTaskCount()>0) {
386                                    adds=engine.adds();
387                                    taskNames = openDirectory.list(FILTER);
388                                    //tasks=engine.getOpenTasks();
389                                    nextExection=Long.MAX_VALUE;
390                                    for(int i=0;i<taskNames.length;i++) {
391                                            task=getTaskByName(openDirectory, taskNames[i]);
392                                            if(task==null) continue;
393                                            
394                                            if(task.nextExecution()<=System.currentTimeMillis()) {
395                                                    //print.o("- execute");
396                                                    tt=new TaskThread(engine,task);
397                                                    tt.start();
398                                                    runningTasks.add(tt);
399                                            }
400                                            else if(task.nextExecution()<nextExection && 
401                                                            nextExection!=-1 && 
402                                                            !task.closed()) 
403                                                    nextExection=task.nextExecution();
404                                            nextExection=joinTasks(runningTasks,maxThreads,nextExection);
405                                    }
406                                    
407                                    nextExection=joinTasks(runningTasks,0,nextExection);
408                                    if(adds!=engine.adds()) continue;
409                                    
410                                    if(nextExection==Long.MAX_VALUE)break;
411                                    long sleep = nextExection-System.currentTimeMillis();
412                                    
413                                    //print.o("sleep:"+sleep+">"+(sleep/1000));
414                                    if(sleep>0)doWait(sleep);
415                                    
416                                    //if(sleep<0)break;
417                            }
418                            //print.o("end:"+getOpenTaskCount());
419                    }
420    
421                    private long joinTasks(List<TaskThread> runningTasks, int maxThreads,long nextExection) {
422                            if(runningTasks.size()>=maxThreads){
423                                    Iterator<TaskThread> it = runningTasks.iterator();
424                                    TaskThread tt;
425                                    SpoolerTask task;
426                                    while(it.hasNext()){
427                                            tt = it.next();
428                                            SystemUtil.join(tt);
429                                            task = tt.getTask();
430    
431                                            if(task!=null && task.nextExecution()!=-1 && task.nextExecution()<nextExection && !task.closed()) {
432                                                    nextExection=task.nextExecution();
433                                            }
434                                    }
435                                    runningTasks.clear();
436                            }
437                            return nextExection;
438                    }
439    
440                    private void doWait(long sleep) {
441                            //long start=System.currentTimeMillis();
442                            try {
443                                    sleeping=true;
444                                    synchronized (this) {
445                                            wait(sleep);
446                                    }
447                                    
448                            } catch (Throwable t) {
449                                    //
450                            }
451                            finally {
452                                    sleeping=false;
453                            }
454                            //print.out(sleep+":"+(System.currentTimeMillis()-start));
455                    }
456                    
457            }
458            
459            
460            class TaskThread extends Thread {
461                    
462                    private SpoolerEngineImpl engine;
463                    private SpoolerTask task;
464    
465                    public TaskThread(SpoolerEngineImpl engine,SpoolerTask task) {
466                            this.engine=engine;
467                            this.task=task;
468                    }
469                    
470                    public SpoolerTask getTask() {
471                            return task;
472                    }
473    
474                    public void run() {
475                            ThreadLocalConfig.register(engine.config);
476                            engine.execute(task);
477                            ThreadLocalConfig.release();
478                            
479                    }
480            }
481            
482    
483            /**
484             * remove that task from Spooler
485             * @param task
486             */
487            public void remove(SpoolerTask task) {
488                    unstore(task);
489                    //if(!openTasks.remove(task))closedTasks.remove(task);
490            }
491            
492            public void removeAll() {
493                    ResourceUtil.removeChildrenEL(openDirectory);
494                    ResourceUtil.removeChildrenEL(closedDirectory);
495                    SystemUtil.sleep(100);
496                    ResourceUtil.removeChildrenEL(openDirectory);
497                    ResourceUtil.removeChildrenEL(closedDirectory);
498            }
499            
500    
501            /* *
502             * @see railo.runtime.spooler.SpoolerEngine#hasAdds()
503             */
504            public int adds() {
505                    //return openTasks.size()>0;
506                    return add;
507            }    
508            
509            /* *
510             * @see railo.runtime.spooler.SpoolerEngine#resetAdds()
511             * /
512            public void resetAdds() {
513                    add=false;
514            }*/
515    
516            /**
517             * @see railo.runtime.spooler.SpoolerEngine#remove(java.lang.String)
518             */
519            public void remove(String id) {
520                    SpoolerTask task = getTaskById(openDirectory,id);
521                    if(task==null)task=getTaskById(closedDirectory,id);
522                    if(task!=null)remove(task);
523            }
524    
525            /*private SpoolerTask getTaskById(SpoolerTask[] tasks, String id) {
526                    for(int i=0;i<tasks.length;i++) {
527                            if(tasks[i].getId().equals(id)) {
528                                    return tasks[i];
529                            }
530                    }
531                    return null;
532            }*/
533    
534            /**
535             * execute task by id and return eror throwd by task
536             * @param id
537             * @throws SpoolerException
538             */
539            public PageException execute(String id) {
540                    SpoolerTask task = getTaskById(openDirectory,id);
541                    if(task==null)task=getTaskById(closedDirectory,id);
542                    if(task!=null){
543                            return execute(task);
544                    }
545                    return null;
546            }
547            
548            public PageException execute(SpoolerTask task) {
549                    //task.closed();
550                    try {
551                            ((SpoolerTaskSupport)task)._execute(config);
552                            //if(task.closed())closedTasks.remove(task);
553                            //else openTasks.remove(task);
554                            
555                            unstore(task);
556                            log.info("remote-client", task.subject());
557                            task.setLastExecution(System.currentTimeMillis());
558                            task.setNextExecution(-1);
559                            
560                            task.setClosed(true);
561                            task=null;
562                    } 
563                    catch(Throwable t) {
564                            task.setLastExecution(System.currentTimeMillis());
565                            task.setNextExecution(calculateNextExecution(task));
566                            log.error("remote-client", task.subject()+":"+t.getMessage());
567                            if(task.nextExecution()==-1) {
568                                    //openTasks.remove(task);
569                                    //if(!closedTasks.contains(task))closedTasks.add(task);
570                                    unstore(task);
571                                    task.setClosed(true);
572                                    store(task);
573                                    task=null;
574                            }
575                            else 
576                                    store(task);
577                            
578                            return Caster.toPageException(t);
579                    }
580                    return null;
581            }
582    
583            /**
584             * @see railo.runtime.spooler.SpoolerEngine#setLabel(java.lang.String)
585             */
586            public void setLabel(String label) {
587                    this.label = label;
588            }
589    
590            /**
591             * @see railo.runtime.spooler.SpoolerEngine#setPersisDirectory(railo.commons.io.res.Resource)
592             */
593            public void setPersisDirectory(Resource persisDirectory) {
594                    this.persisDirectory = persisDirectory;
595            }
596    
597            /**
598             * @see railo.runtime.spooler.SpoolerEngine#setLog(railo.commons.io.log.Log)
599             */
600            public void setLog(Log log) {
601                    this.log = log;
602            }
603    
604            /**
605             * @see railo.runtime.spooler.SpoolerEngine#setConfig(railo.runtime.config.Config)
606             */
607            public void setConfig(Config config) {
608                    this.config = config;
609            }
610            
611    }
612            
613    class TaskFileFilter implements ResourceNameFilter {
614    
615            public boolean accept(Resource parent, String name) {
616                    return name!=null && name.endsWith(".tsk");
617            }
618            
619    }