Main Page   Modules   Class Hierarchy   Compound List   File List   Compound Members   File Members  

outchannels.cpp

00001 /* MuSE - Multiple Streaming Engine
00002  * Copyright (C) 2000-2003 Denis Rojo aka jaromil <jaromil@dyne.org>
00003  *
00004  * This source code is free software; you can redistribute it and/or
00005  * modify it under the terms of the GNU Public License as published 
00006  * by the Free Software Foundation; either version 2 of the License,
00007  * or (at your option) any later version.
00008  *
00009  * This source code is distributed in the hope that it will be useful,
00010  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00011  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
00012  * Please refer to the GNU Public License for more details.
00013  *
00014  * You should have received a copy of the GNU Public License along with
00015  * this source code; if not, write to:
00016  * Free Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
00017  *
00018  * "$Id: outchannels.cpp,v 1.6 2004/04/15 13:12:15 jaromil Exp $"
00019  *
00020  */
00021 
00022 #include <iostream>
00023 #include <math.h>
00024 #include <stdio.h>
00025 #include <stdlib.h>
00026 #include <string.h>
00027 #include <sys/socket.h>
00028 #include <netinet/in.h>
00029 #include <arpa/inet.h>
00030 #include <netdb.h>
00031 #include <sys/types.h>
00032 #include <sys/stat.h>
00033 #include <fcntl.h>
00034 #include <unistd.h>
00035 #include <errno.h>
00036 
00037 #include <generic.h>
00038 #include <outchannels.h>
00039 #include <jutils.h>
00040 #include <config.h>
00041 
00042 #ifdef HAVE_LAME
00043 #include <out_lame.h>
00044 #endif
00045 
00046 #ifdef HAVE_VORBIS
00047 #include <out_vorbis.h>
00048 #endif
00049 
00050 
00051 /* I think a tighter bound could be:  (mt, March 2000)
00052  * MPEG1:
00053  *    num_samples*(bitrate/8)/samplerate + 4*1152*(bitrate/8)/samplerate + 512
00054  * MPEG2:
00055  *    num_samples*(bitrate/8)/samplerate + 4*576*(bitrate/8)/samplerate + 256
00056  */
00057 
00058 /* mp3 encoder */
00059 
00060 
00061 OutChannel::OutChannel(char *myname)
00062   : Entry() {
00063   func("OutChannel::OutChannel(%s) %p",myname,this);
00064 
00065   quit = false;
00066   initialized = false;
00067   running = false;
00068   sprintf(name,"%s",myname);
00069   sprintf(version, "  ");
00070   encoded = 0;
00071   fd = NULL;
00072   _thread_initialized = false;
00073   _thread_init();
00074 
00075   idseed = 0;
00076 
00077   erbapipa = new Pipe(OUT_PIPESIZE);
00078 
00079   /* setup defaults */
00080   bps(24);
00081   freq(22050);
00082   channels(1);
00083   quality(4.0);
00084   lowpass(0);
00085   highpass(0);
00086     
00087   //  profile_changed = true;
00088 
00089   guess_bps();
00090 }
00091 
00092 OutChannel::~OutChannel() {
00093   func("OutChannel::~OutChannel");
00094 
00095   
00096   quit = true;
00097   unlock();// signal();  
00098 
00099   initialized = false;
00100 
00101   jsleep(0,50);
00102 
00103   Shouter *ice = (Shouter*)icelist.begin();
00104   lock_ice();
00105   while(ice) {
00106     icelist.rem(1);
00107     delete ice;
00108     ice = (Shouter*)icelist.begin();
00109   }
00110   unlock_ice();
00111 
00112   delete erbapipa;
00113 
00114   if(fd) dump_stop();
00115 
00116 
00117   /* QUAAAA */
00118   //_thread_destroy();
00119 }
00120 
00121 void OutChannel::start() {
00122   Shouter *ice = (Shouter*) icelist.begin();
00123   lock_ice();
00124   while(ice) {
00125     if( ice->apply_profile() )
00126       ice->start();
00127     ice = (Shouter*) ice->next;
00128   }
00129   unlock_ice();
00130   pthread_create(&_thread, &_attr, &kickoff, this);
00131 }
00132 
00133 void OutChannel::run() {
00134   int res;
00135   /*
00136   if(!initialized) {
00137     warning("OutChannel::run() : output channel uninitialized, thread won't start");
00138     return;
00139   }
00140   */
00141 
00142   running = true;
00143   while(!quit) {
00144 
00145     /* check if we must encode */
00146     encoding = false;
00147     if(fd) encoding = true;
00148     Shouter *ice = (Shouter*)icelist.begin();
00149     while(ice) {
00150       if(ice->running) encoding = true;
00151       ice = (Shouter*)ice->next;
00152     }
00153     if(!initialized) encoding = false;
00154     
00155     if(!encoding) {
00156       jsleep(0,50);
00157       shout(); /* in case there are waiting retries */
00158       if(quit) break;
00159       continue;
00160     } else jsleep(0,5); /* avoid a tight loop */
00161 
00162     /* erbapipa sucking is now done in instantiated classes
00163        inside the encode() method
00164        (see vorbis and lame classes)
00165     */
00166 
00167     encoded = 0;
00168 
00169     lock();
00170     encode();
00171     unlock();
00172 
00173     if(encoded<1) continue;
00174     
00175 
00176     calc_bitrate(encoded);
00177     /* stream it to the net */
00178     res = shout();
00179 
00180     /* save it on the harddisk */
00181     res = dump();
00182 
00183     /* TODO: flush when erbapipa->read != OUT_CHUNK */
00184     
00185   }
00186 
00187   running = false;
00188 }
00189 
00190 int OutChannel::create_ice() {
00191   Shouter *ice = new Shouter();
00192 
00193   if(!ice) {
00194     error("can't create icecast shouter");
00195     return -1;
00196   }
00197 
00198   /* the icecast id is the position in the linklist */
00199   lock_ice();
00200   icelist.add(ice);
00201   idseed++;
00202   ice->id = id + idseed; 
00203 
00204 
00205   switch(tipo) {
00206 
00207   case MP3:
00208     ice->format = SHOUT_FORMAT_MP3;
00209     ice->login(SHOUT_PROTOCOL_HTTP);
00210     break;
00211 
00212   case OGG:
00213     ice->format = SHOUT_FORMAT_VORBIS;
00214     ice->login(SHOUT_PROTOCOL_HTTP);
00215     break;
00216 
00217   default:
00218     error("codec is not streamable");
00219     delete ice;
00220     return -1;
00221   }
00222 
00223   ice->apply_profile();
00224 
00225   unlock_ice();
00226   func("outchannel id %i creates new shouter %p with id %i",id,ice,ice->id);
00227   return ice->id;
00228 }
00229 
00230 bool OutChannel::delete_ice(int iceid) {
00231 
00232   Shouter *ice = get_ice(iceid);
00233   if(!ice) {
00234     warning("OutChannel::delete_ice(%i) : invalid id",iceid);
00235     return false;
00236   }
00237 
00238   if(ice) {
00239     lock_ice();
00240     ice->rem();
00241     if(ice->running) ice->stop();
00242     delete ice;
00243     unlock_ice();
00244   }
00245 
00246   func("outchannel id %i deleted shouter id %i",id,iceid);
00247   return true;
00248 }
00249 
00250 Shouter *OutChannel::get_ice(int iceid) {
00251   return (Shouter*)icelist.pick_id(iceid);
00252 }
00253 
00254 bool OutChannel::apply_ice(int iceid) {
00255   bool res = false;
00256   
00257   Shouter *ice = get_ice(iceid);
00258 
00259   if(ice) {
00260     lock_ice();
00261     res = ice->apply_profile();
00262     unlock_ice();
00263   }
00264   return res;
00265 }
00266 
00267 bool OutChannel::connect_ice(int iceid, bool on) {
00268   bool res = false;
00269   func("OutChannel::connect_ice(%i,%i)",iceid,on);
00270   Shouter *ice = get_ice(iceid);
00271   if(!ice) { 
00272     error("Outchannel::connect_ice : can't find shouter with id %i",iceid);
00273     return false;
00274   }
00275 
00276   lock_ice();
00277   res = (on) ? ice->start() : ice->stop();
00278   unlock_ice();
00279   
00280   return res;
00281 }
00282 
00283 int OutChannel::shout() {
00284   int res, sentout = 0;
00285   time_t now = time(NULL);
00286   lock_ice();
00287   Shouter *ice = (Shouter*)icelist.begin();
00288   while(ice) {
00289     if(ice->running) {
00290       res = ice->send(buffer,encoded);
00291       if(res<0) { /* there is an error: -1=temporary , -2=fatal */
00292         if(res==-1) { ice = (Shouter*)ice->next; continue; }
00293         if(res==-2) {
00294           error("fatal error on stream to %s:%u",ice->host(),ice->port());
00295           ice->stop();
00296           notice("retrying to connect to %s:%u%s after %i seconds",
00297                  ice->host(), ice->port(), ice->mount(), RETRY_DELAY);
00298           ice->retry = now;
00299           //      ice = (Shouter*)ice->next;
00300           //      continue;
00301         }
00302       } else sentout += res;
00303     } else if(ice->retry>0) {
00304       if((now - ice->retry) > RETRY_DELAY) {
00305         notice("try to reconnect to %s:%u%s",
00306                ice->host(), ice->port(), ice->mount());
00307         if( ice->start() ) ice->retry = 0;
00308         else ice->retry = now;
00309       }
00310     }
00311     ice = (Shouter*)ice->next;
00312   }
00313   unlock_ice();
00314   return sentout;
00315 }
00316 
00317 bool OutChannel::dump_start(char *file) {
00318   struct stat st;
00319   char temp[MAX_PATH_SIZE];
00320   int num = 0;
00321   
00322   if(fd) {
00323     warning("%s channel allready dumping to %s",name,fd_name);
00324     return false;
00325   }
00326 
00327   /* avoid to overwrite existent files */
00328   snprintf(temp,MAX_PATH_SIZE,"%s",file);
00329   while(stat(temp, &st) != -1) {
00330     /* file EXIST */
00331     num++;
00332     snprintf(temp,MAX_PATH_SIZE,"%s.%i",file,num);
00333   }
00334 
00335   fd = fopen(temp,"wb"); /* writeonly nonblocking binary (-rw-rw-r--) */
00336   if(!fd) {
00337     error("%s channel can't open %s for writing",name,temp);
00338     act("%s",strerror(errno));
00339     return(false);
00340   }
00341 
00342   strncpy(fd_name,temp,MAX_PATH_SIZE);
00343   notice("%s channel dumping to file %s",name,fd_name);
00344 
00345   return true;
00346 }
00347 
00348 bool OutChannel::dump_stop() {
00349   func("OutChanne::dump_stop()");
00350   if(!fd) {
00351     warning("%s channel is not dumping to any file",name);
00352     return false;
00353   }
00354 
00355   fflush(fd);
00356   
00357   act("%s channel stops dumping to %s",name,fd_name);
00358   
00359   fclose(fd);
00360   fd = NULL;
00361 
00362   return true;
00363 }
00364 
00365 bool OutChannel::dump() {
00366   int res;
00367   if(!fd) return false;
00368   func("OutChannel::dump() encoded %i",encoded);
00369   fflush(fd);
00370   if(!encoded) return true;
00371   res = fwrite(buffer,1,encoded,fd);
00372   if(res != encoded)
00373     warning("skipped %u bytes dumping to file %s",encoded - res,fd_name);
00374   return true;
00375 }
00376 /*
00377 void OutChannel::bps(int in) {
00378   _bps = in;
00379   Shouter *ice = (Shouter*)icelist.begin();
00380   while(ice) {
00381     ice->_bps = in;
00382     ice = (Shouter*)ice->next;
00383   }
00384 }
00385 */
00386 bool OutChannel::calc_bitrate(int enc) {
00387   /* calcolates bitrate */
00388   bytes_accu += enc;
00389   now = dtime();
00390   if((now-prev)>1) { /* if one second passed */
00391     bitrate = (bytes_accu<<2);
00392     bytes_accu = 0;
00393     prev = now;
00394     return true;
00395   }
00396   return false;
00397 }
00398 
00399 char *OutChannel::guess_bps() {
00400   int q = (int)fabs(quality());
00401 
00402   //  if(channels()<1) channels(1);
00403   //  if(channels()>2) channels(2);
00404 
00405   switch(q) {
00406 
00407 #define BPSVAL(b,f) \
00408 bps(b); freq(f); \
00409 snprintf(quality_desc,256,"%uKbit/s %uHz",bps(),freq());
00410 //if(bps()==0) bps(b); if(freq()==0) freq(f);
00411   
00412   case 0: BPSVAL(8,11025); break;
00413   case 1: BPSVAL(16,16000); break;
00414   case 2: BPSVAL(16,22050); break;
00415   case 3: BPSVAL(24,16000); break;
00416   case 4: BPSVAL(24,22050); break;
00417   case 5: BPSVAL(48,22050); break;
00418   case 6: BPSVAL(56,22050); break;
00419   case 7: BPSVAL(64,44100); break;
00420   case 8: BPSVAL(96,44100); break;
00421   case 9: BPSVAL(128,44100); break;
00422 
00423   }
00424   return quality_desc;
00425 }
00426 
00427 void OutChannel::push(void *data, int len) {
00428   int errors = 0;
00429   /* check out if encoders are configured */
00430   if(!encoding) return;
00431   if(!initialized) return;
00432 
00433   /* push in data
00434      wait if pipe is full or occupied
00435      returns the right thing */
00436   //  func("PID %i wants to push %i bytes in %s",getpid(),len,name);
00437   //  func("pipe has %i free space",erbapipa->space());
00438   while( running && 
00439          erbapipa->write(len,data) < 0 ) {
00440     //    func("PID %i waits to push %i bytes in %s",getpid(),len,name);
00441     jsleep(0,30);
00442     errors++;
00443     if(errors>20) {
00444       warning("%s encoder is stuck, pipe is full",name);
00445       return;
00446     }
00447   }
00448   //  func("ok, %i bytes pushed succesfully in %s",len,name);
00449 }
00450 
00451 /* thread stuff */
00452 
00453 void OutChannel::_thread_init() {
00454   if(_thread_initialized) return;
00455 
00456   func("OutChannel::thread_init()");
00457   if(pthread_mutex_init (&_mutex,NULL) == -1)
00458     error("error initializing POSIX thread mutex");
00459   if(pthread_mutex_init (&_mutex_ice,NULL) == -1)
00460     error("error initializing POSIX thread mutex");
00461   if(pthread_cond_init (&_cond, NULL) == -1)
00462     error("error initializing POSIX thread condition"); 
00463   if(pthread_attr_init (&_attr) == -1)
00464     error("error initializing POSIX thread attribute");
00465   
00466   /* set the thread as detached
00467      see: man pthread_attr_init(3) */
00468   pthread_attr_setdetachstate(&_attr,PTHREAD_CREATE_DETACHED);
00469 
00470   _thread_initialized = true;
00471 }
00472 
00473 void OutChannel::_thread_destroy() {
00474   if(!_thread_initialized) return;
00475   
00476   /* we signal and then we check the thread
00477      exited by locking the conditional */
00478   if(running) {
00479     signal();
00480     lock(); unlock();
00481   }
00482 
00483   if(pthread_mutex_destroy(&_mutex) == -1)
00484     error("error destroying POSIX thread mutex");
00485   if(pthread_cond_destroy(&_cond) == -1)
00486     error("error destroying POSIX thread condition");
00487   if(pthread_attr_destroy(&_attr) == -1)
00488     error("error destroying POSIX thread attribute");
00489   _thread_initialized = false;
00490 }

Generated on Sat Apr 17 17:38:49 2004 for MuSE by doxygen1.3