00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include <iostream>
00023 #include <math.h>
00024 #include <stdio.h>
00025 #include <stdlib.h>
00026 #include <string.h>
00027 #include <sys/socket.h>
00028 #include <netinet/in.h>
00029 #include <arpa/inet.h>
00030 #include <netdb.h>
00031 #include <sys/types.h>
00032 #include <sys/stat.h>
00033 #include <fcntl.h>
00034 #include <unistd.h>
00035 #include <errno.h>
00036
00037 #include <generic.h>
00038 #include <outchannels.h>
00039 #include <jutils.h>
00040 #include <config.h>
00041
00042 #ifdef HAVE_LAME
00043 #include <out_lame.h>
00044 #endif
00045
00046 #ifdef HAVE_VORBIS
00047 #include <out_vorbis.h>
00048 #endif
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061 OutChannel::OutChannel(char *myname)
00062 : Entry() {
00063 func("OutChannel::OutChannel(%s) %p",myname,this);
00064
00065 quit = false;
00066 initialized = false;
00067 running = false;
00068 sprintf(name,"%s",myname);
00069 sprintf(version, " ");
00070 encoded = 0;
00071 fd = NULL;
00072 _thread_initialized = false;
00073 _thread_init();
00074
00075 idseed = 0;
00076
00077 erbapipa = new Pipe(OUT_PIPESIZE);
00078
00079
00080 bps(24);
00081 freq(22050);
00082 channels(1);
00083 quality(4.0);
00084 lowpass(0);
00085 highpass(0);
00086
00087
00088
00089 guess_bps();
00090 }
00091
00092 OutChannel::~OutChannel() {
00093 func("OutChannel::~OutChannel");
00094
00095
00096 quit = true;
00097 unlock();
00098
00099 initialized = false;
00100
00101 jsleep(0,50);
00102
00103 Shouter *ice = (Shouter*)icelist.begin();
00104 lock_ice();
00105 while(ice) {
00106 icelist.rem(1);
00107 delete ice;
00108 ice = (Shouter*)icelist.begin();
00109 }
00110 unlock_ice();
00111
00112 delete erbapipa;
00113
00114 if(fd) dump_stop();
00115
00116
00117
00118
00119 }
00120
00121 void OutChannel::start() {
00122 Shouter *ice = (Shouter*) icelist.begin();
00123 lock_ice();
00124 while(ice) {
00125 if( ice->apply_profile() )
00126 ice->start();
00127 ice = (Shouter*) ice->next;
00128 }
00129 unlock_ice();
00130 pthread_create(&_thread, &_attr, &kickoff, this);
00131 }
00132
00133 void OutChannel::run() {
00134 int res;
00135
00136
00137
00138
00139
00140
00141
00142 running = true;
00143 while(!quit) {
00144
00145
00146 encoding = false;
00147 if(fd) encoding = true;
00148 Shouter *ice = (Shouter*)icelist.begin();
00149 while(ice) {
00150 if(ice->running) encoding = true;
00151 ice = (Shouter*)ice->next;
00152 }
00153 if(!initialized) encoding = false;
00154
00155 if(!encoding) {
00156 jsleep(0,50);
00157 shout();
00158 if(quit) break;
00159 continue;
00160 } else jsleep(0,5);
00161
00162
00163
00164
00165
00166
00167 encoded = 0;
00168
00169 lock();
00170 encode();
00171 unlock();
00172
00173 if(encoded<1) continue;
00174
00175
00176 calc_bitrate(encoded);
00177
00178 res = shout();
00179
00180
00181 res = dump();
00182
00183
00184
00185 }
00186
00187 running = false;
00188 }
00189
00190 int OutChannel::create_ice() {
00191 Shouter *ice = new Shouter();
00192
00193 if(!ice) {
00194 error("can't create icecast shouter");
00195 return -1;
00196 }
00197
00198
00199 lock_ice();
00200 icelist.add(ice);
00201 idseed++;
00202 ice->id = id + idseed;
00203
00204
00205 switch(tipo) {
00206
00207 case MP3:
00208 ice->format = SHOUT_FORMAT_MP3;
00209 ice->login(SHOUT_PROTOCOL_HTTP);
00210 break;
00211
00212 case OGG:
00213 ice->format = SHOUT_FORMAT_VORBIS;
00214 ice->login(SHOUT_PROTOCOL_HTTP);
00215 break;
00216
00217 default:
00218 error("codec is not streamable");
00219 delete ice;
00220 return -1;
00221 }
00222
00223 ice->apply_profile();
00224
00225 unlock_ice();
00226 func("outchannel id %i creates new shouter %p with id %i",id,ice,ice->id);
00227 return ice->id;
00228 }
00229
00230 bool OutChannel::delete_ice(int iceid) {
00231
00232 Shouter *ice = get_ice(iceid);
00233 if(!ice) {
00234 warning("OutChannel::delete_ice(%i) : invalid id",iceid);
00235 return false;
00236 }
00237
00238 if(ice) {
00239 lock_ice();
00240 ice->rem();
00241 if(ice->running) ice->stop();
00242 delete ice;
00243 unlock_ice();
00244 }
00245
00246 func("outchannel id %i deleted shouter id %i",id,iceid);
00247 return true;
00248 }
00249
00250 Shouter *OutChannel::get_ice(int iceid) {
00251 return (Shouter*)icelist.pick_id(iceid);
00252 }
00253
00254 bool OutChannel::apply_ice(int iceid) {
00255 bool res = false;
00256
00257 Shouter *ice = get_ice(iceid);
00258
00259 if(ice) {
00260 lock_ice();
00261 res = ice->apply_profile();
00262 unlock_ice();
00263 }
00264 return res;
00265 }
00266
00267 bool OutChannel::connect_ice(int iceid, bool on) {
00268 bool res = false;
00269 func("OutChannel::connect_ice(%i,%i)",iceid,on);
00270 Shouter *ice = get_ice(iceid);
00271 if(!ice) {
00272 error("Outchannel::connect_ice : can't find shouter with id %i",iceid);
00273 return false;
00274 }
00275
00276 lock_ice();
00277 res = (on) ? ice->start() : ice->stop();
00278 unlock_ice();
00279
00280 return res;
00281 }
00282
00283 int OutChannel::shout() {
00284 int res, sentout = 0;
00285 time_t now = time(NULL);
00286 lock_ice();
00287 Shouter *ice = (Shouter*)icelist.begin();
00288 while(ice) {
00289 if(ice->running) {
00290 res = ice->send(buffer,encoded);
00291 if(res<0) {
00292 if(res==-1) { ice = (Shouter*)ice->next; continue; }
00293 if(res==-2) {
00294 error("fatal error on stream to %s:%u",ice->host(),ice->port());
00295 ice->stop();
00296 notice("retrying to connect to %s:%u%s after %i seconds",
00297 ice->host(), ice->port(), ice->mount(), RETRY_DELAY);
00298 ice->retry = now;
00299
00300
00301 }
00302 } else sentout += res;
00303 } else if(ice->retry>0) {
00304 if((now - ice->retry) > RETRY_DELAY) {
00305 notice("try to reconnect to %s:%u%s",
00306 ice->host(), ice->port(), ice->mount());
00307 if( ice->start() ) ice->retry = 0;
00308 else ice->retry = now;
00309 }
00310 }
00311 ice = (Shouter*)ice->next;
00312 }
00313 unlock_ice();
00314 return sentout;
00315 }
00316
00317 bool OutChannel::dump_start(char *file) {
00318 struct stat st;
00319 char temp[MAX_PATH_SIZE];
00320 int num = 0;
00321
00322 if(fd) {
00323 warning("%s channel allready dumping to %s",name,fd_name);
00324 return false;
00325 }
00326
00327
00328 snprintf(temp,MAX_PATH_SIZE,"%s",file);
00329 while(stat(temp, &st) != -1) {
00330
00331 num++;
00332 snprintf(temp,MAX_PATH_SIZE,"%s.%i",file,num);
00333 }
00334
00335 fd = fopen(temp,"wb");
00336 if(!fd) {
00337 error("%s channel can't open %s for writing",name,temp);
00338 act("%s",strerror(errno));
00339 return(false);
00340 }
00341
00342 strncpy(fd_name,temp,MAX_PATH_SIZE);
00343 notice("%s channel dumping to file %s",name,fd_name);
00344
00345 return true;
00346 }
00347
00348 bool OutChannel::dump_stop() {
00349 func("OutChanne::dump_stop()");
00350 if(!fd) {
00351 warning("%s channel is not dumping to any file",name);
00352 return false;
00353 }
00354
00355 fflush(fd);
00356
00357 act("%s channel stops dumping to %s",name,fd_name);
00358
00359 fclose(fd);
00360 fd = NULL;
00361
00362 return true;
00363 }
00364
00365 bool OutChannel::dump() {
00366 int res;
00367 if(!fd) return false;
00368 func("OutChannel::dump() encoded %i",encoded);
00369 fflush(fd);
00370 if(!encoded) return true;
00371 res = fwrite(buffer,1,encoded,fd);
00372 if(res != encoded)
00373 warning("skipped %u bytes dumping to file %s",encoded - res,fd_name);
00374 return true;
00375 }
00376
00377
00378
00379
00380
00381
00382
00383
00384
00385
00386 bool OutChannel::calc_bitrate(int enc) {
00387
00388 bytes_accu += enc;
00389 now = dtime();
00390 if((now-prev)>1) {
00391 bitrate = (bytes_accu<<2);
00392 bytes_accu = 0;
00393 prev = now;
00394 return true;
00395 }
00396 return false;
00397 }
00398
00399 char *OutChannel::guess_bps() {
00400 int q = (int)fabs(quality());
00401
00402
00403
00404
00405 switch(q) {
00406
00407 #define BPSVAL(b,f) \
00408 bps(b); freq(f); \
00409 snprintf(quality_desc,256,"%uKbit/s %uHz",bps(),freq());
00410
00411
00412 case 0: BPSVAL(8,11025); break;
00413 case 1: BPSVAL(16,16000); break;
00414 case 2: BPSVAL(16,22050); break;
00415 case 3: BPSVAL(24,16000); break;
00416 case 4: BPSVAL(24,22050); break;
00417 case 5: BPSVAL(48,22050); break;
00418 case 6: BPSVAL(56,22050); break;
00419 case 7: BPSVAL(64,44100); break;
00420 case 8: BPSVAL(96,44100); break;
00421 case 9: BPSVAL(128,44100); break;
00422
00423 }
00424 return quality_desc;
00425 }
00426
00427 void OutChannel::push(void *data, int len) {
00428 int errors = 0;
00429
00430 if(!encoding) return;
00431 if(!initialized) return;
00432
00433
00434
00435
00436
00437
00438 while( running &&
00439 erbapipa->write(len,data) < 0 ) {
00440
00441 jsleep(0,30);
00442 errors++;
00443 if(errors>20) {
00444 warning("%s encoder is stuck, pipe is full",name);
00445 return;
00446 }
00447 }
00448
00449 }
00450
00451
00452
00453 void OutChannel::_thread_init() {
00454 if(_thread_initialized) return;
00455
00456 func("OutChannel::thread_init()");
00457 if(pthread_mutex_init (&_mutex,NULL) == -1)
00458 error("error initializing POSIX thread mutex");
00459 if(pthread_mutex_init (&_mutex_ice,NULL) == -1)
00460 error("error initializing POSIX thread mutex");
00461 if(pthread_cond_init (&_cond, NULL) == -1)
00462 error("error initializing POSIX thread condition");
00463 if(pthread_attr_init (&_attr) == -1)
00464 error("error initializing POSIX thread attribute");
00465
00466
00467
00468 pthread_attr_setdetachstate(&_attr,PTHREAD_CREATE_DETACHED);
00469
00470 _thread_initialized = true;
00471 }
00472
00473 void OutChannel::_thread_destroy() {
00474 if(!_thread_initialized) return;
00475
00476
00477
00478 if(running) {
00479 signal();
00480 lock(); unlock();
00481 }
00482
00483 if(pthread_mutex_destroy(&_mutex) == -1)
00484 error("error destroying POSIX thread mutex");
00485 if(pthread_cond_destroy(&_cond) == -1)
00486 error("error destroying POSIX thread condition");
00487 if(pthread_attr_destroy(&_attr) == -1)
00488 error("error destroying POSIX thread attribute");
00489 _thread_initialized = false;
00490 }