Pastebin
Retrouvez, créez et partagez vos snippets en temps réel.
Rechercher un Pastebin
Aucun paste trouvé.
Créer un paste
Pastebin
Blog
q
//claude opus - versione ROBUSTA con recovery USB #include <uhd/usrp/multi_usrp.hpp> #include <uhd/stream.hpp> #include <complex> #include <vector> #include <queue> #include <mutex> #include <atomic> #include <thread> #include <iostream> #include <csignal> #include <chrono> #include <algorithm> #include <uhd/utils/thread.hpp> #include <condition_variable> #include <iomanip> static std::atomic<bool> gstop(false); void sig_h(int) { gstop = true; } // Watchdog per rilevare thread bloccati class Watchdog { std::atomic<std::chrono::steady_clock::time_point> last_update; std::chrono::seconds timeout; std::string name; public: Watchdog(const std::string& n, int timeout_sec = 10) : name(n), timeout(timeout_sec) { touch(); } void touch() { last_update = std::chrono::steady_clock::now(); } bool is_alive() const { auto now = std::chrono::steady_clock::now(); auto last = last_update.load(); return (now - last) < timeout; } double seconds_since_update() const { auto now = std::chrono::steady_clock::now(); auto last = last_update.load(); return std::chrono::duration<double>(now - last).count(); } std::string get_name() const { return name; } }; template<typename T> class SafeQueue { std::queue<T> q; mutable std::mutex mx; std::condition_variable cv; size_t max_size; std::atomic<uint64_t> dropped{0}; public: SafeQueue(size_t max = 30) : max_size(max) {} bool push(T&& item) { std::lock_guard<std::mutex> lk(mx); if (q.size() >= max_size) { q.pop(); dropped++; } q.push(std::move(item)); cv.notify_one(); return true; } bool pop(T& item, int timeout_ms = 50) { std::unique_lock<std::mutex> lk(mx); if (!cv.wait_for(lk, std::chrono::milliseconds(timeout_ms), [this] { return !q.empty(); })) { return false; } item = std::move(q.front()); q.pop(); return true; } size_t size() const { std::lock_guard<std::mutex> lk(mx); return q.size(); } void clear() { std::lock_guard<std::mutex> lk(mx); while (!q.empty()) q.pop(); } uint64_t get_dropped() const { return dropped.load(); } }; int main(int argc, char** argv) { std::string s1, s2, clk = "external"; double r = 0, f1r = 0, f1t = 0, f2r = 0, f2t = 0; double g1r = 58, g1t = 29, g2r = 58, g2t = 29; size_t spb = 4096; // Parsing argomenti for (int i = 1; i < argc; i++) { std::string a = argv[i]; auto nd = [&](double& v) { if (i + 1 < argc) v = std::stod(argv[++i]); }; auto ns = [&](std::string& v) { if (i + 1 < argc) v = argv[++i]; }; if (a == "--serial-usrp1") ns(s1); else if (a == "--serial-usrp2") ns(s2); else if (a == "--freq-usrp1-rx") nd(f1r); else if (a == "--freq-usrp1-tx") nd(f1t); else if (a == "--freq-usrp2-rx") nd(f2r); else if (a == "--freq-usrp2-tx") nd(f2t); else if (a == "--rate") nd(r); else if (a == "--gain-usrp1-rx") nd(g1r); else if (a == "--gain-usrp1-tx") nd(g1t); else if (a == "--gain-usrp2-rx") nd(g2r); else if (a == "--gain-usrp2-tx") nd(g2t); else if (a == "--samps-per-buffer") { double t; nd(t); spb = size_t(t); } else if (a == "--clock-source") ns(clk); } if (s1.empty() || s2.empty() || f1r <= 0 || f1t <= 0 || f2r <= 0 || f2t <= 0 || r <= 0) { std::cerr << "Argomenti obbligatori mancanti.\n"; return 1; } std::signal(SIGINT, sig_h); uhd::set_thread_priority(); // Verifica ottimizzazioni USB std::cout << "\n=== Verifica Configurazione USB ===" << std::endl; std::ifstream usb_mem("/sys/module/usbcore/parameters/usbfs_memory_mb"); if (usb_mem) { int mem_mb; usb_mem >> mem_mb; std::cout << "USB Memory Buffer: " << mem_mb << " MB"; if (mem_mb < 1000) { std::cout << " ⚠️ BASSO! Raccomandato >= 1000 MB" << std::endl; std::cout << "Esegui: sudo sh -c 'echo 1000 > /sys/module/usbcore/parameters/usbfs_memory_mb'" << std::endl; } else { std::cout << " ✓" << std::endl; } } std::cout << "================================\n" << std::endl; // Creazione USRP con retry auto make_usrp_with_retry = [](const std::string& serial, int max_retries = 3) { uhd::device_addr_t dev_addr; dev_addr["serial"] = serial; dev_addr["recv_buff_size"] = "100000000"; dev_addr["send_buff_size"] = "100000000"; dev_addr["num_recv_frames"] = "256"; dev_addr["num_send_frames"] = "256"; for (int attempt = 1; attempt <= max_retries; attempt++) { try { std::cout << "Connessione USRP " << serial << " (tentativo " << attempt << ")..." << std::endl; auto usrp = uhd::usrp::multi_usrp::make(dev_addr); std::cout << " ✓ Connesso: " << usrp->get_mboard_name() << std::endl; return usrp; } catch (const std::exception& e) { std::cerr << " ✗ Errore: " << e.what() << std::endl; if (attempt < max_retries) { std::cout << " Riprovo tra 2 secondi..." << std::endl; std::this_thread::sleep_for(std::chrono::seconds(2)); } else { throw; } } } return uhd::usrp::multi_usrp::sptr(); }; auto u1 = make_usrp_with_retry(s1); auto u2 = make_usrp_with_retry(s2); // Configurazione auto setu = [&](uhd::usrp::multi_usrp::sptr u, double fr, double ft, double gR, double gT, const std::string& name) { u->set_clock_source(clk); std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Verifica lock se clock esterno if (clk != "internal") { try { auto sensors = u->get_mboard_sensor_names(0); if (std::find(sensors.begin(), sensors.end(), "ref_locked") != sensors.end()) { auto locked = u->get_mboard_sensor("ref_locked", 0); std::cout << name << " ref_locked: " << locked.to_pp_string() << std::endl; if (locked.to_bool() == false) { std::cerr << "⚠️ ATTENZIONE: Clock reference non locked!" << std::endl; } } } catch (...) { std::cerr << name << " impossibile verificare ref_locked" << std::endl; } } u->set_rx_rate(r); u->set_tx_rate(r); u->set_rx_freq(fr); u->set_tx_freq(ft); u->set_rx_gain(gR); u->set_tx_gain(gT); std::cout << name << " configurato: RX " << u->get_rx_freq()/1e6 << " MHz @ " << u->get_rx_rate()/1e6 << " Msps" << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(100)); }; std::cout << "\n=== Configurazione USRP ===" << std::endl; setu(u1, f1r, f1t, g1r, g1t, "USRP1"); setu(u2, f2r, f2t, g2r, g2t, "USRP2"); std::cout << "============================\n" << std::endl; // Stream uhd::stream_args_t sa("fc32", "sc16"); sa.channels = {0}; auto u1rx = u1->get_rx_stream(sa); auto u2rx = u2->get_rx_stream(sa); auto u1tx = u1->get_tx_stream(sa); auto u2tx = u2->get_tx_stream(sa); // Code e metriche SafeQueue<std::vector<std::complex<float>>> q12, q21; std::atomic<bool> running(true); std::atomic<uint64_t> rx1_count(0), rx2_count(0), tx1_count(0), tx2_count(0); std::atomic<uint64_t> overflow_count(0), underrun_count(0); std::atomic<uint64_t> rx1_errors(0), rx2_errors(0), tx1_errors(0), tx2_errors(0); std::atomic<bool> usb_error_detected(false); // Watchdogs Watchdog wd_rx1("RX1", 15); Watchdog wd_rx2("RX2", 15); Watchdog wd_tx1("TX1", 15); Watchdog wd_tx2("TX2", 15); // RX loop con error handling robusto auto rx_loop = [&](uhd::rx_streamer::sptr stream, SafeQueue<std::vector<std::complex<float>>>& queue, std::atomic<uint64_t>& counter, std::atomic<uint64_t>& error_counter, Watchdog& wd, const std::string& name) { std::vector<std::complex<float>> buf(spb); uhd::rx_metadata_t md; size_t consecutive_errors = 0; while (running && !gstop && !usb_error_detected) { wd.touch(); try { size_t n = stream->recv(&buf.front(), buf.size(), md, 0.3, false); if (md.error_code == uhd::rx_metadata_t::ERROR_CODE_TIMEOUT) { consecutive_errors = 0; continue; } if (md.error_code == uhd::rx_metadata_t::ERROR_CODE_OVERFLOW) { overflow_count++; consecutive_errors = 0; if (overflow_count % 10 == 1) { std::cerr << "[" << name << "] Overflow #" << overflow_count << std::endl; } continue; } if (md.error_code != uhd::rx_metadata_t::ERROR_CODE_NONE) { error_counter++; consecutive_errors++; std::string err_msg = md.strerror(); if (err_msg.find("LIBUSB") != std::string::npos || err_msg.find("NO_DEVICE") != std::string::npos) { std::cerr << "[" << name << "] ⚠️ ERRORE USB CRITICO: " << err_msg << std::endl; usb_error_detected = true; break; } if (consecutive_errors > 50) { std::cerr << "[" << name << "] Troppi errori consecutivi (" << consecutive_errors << ")" << std::endl; usb_error_detected = true; break; } if (error_counter % 100 == 1) { std::cerr << "[" << name << "] Errore RX: " << err_msg << std::endl; } std::this_thread::sleep_for(std::chrono::milliseconds(10)); continue; } consecutive_errors = 0; if (n > 0) { std::vector<std::complex<float>> data(buf.begin(), buf.begin() + n); queue.push(std::move(data)); counter += n; } } catch (const std::exception& e) { error_counter++; std::string err_msg = e.what(); if (err_msg.find("LIBUSB") != std::string::npos || err_msg.find("NO_DEVICE") != std::string::npos) { std::cerr << "[" << name << "] ⚠️ EXCEPTION USB CRITICA: " << err_msg << std::endl; usb_error_detected = true; break; } std::cerr << "[" << name << "] Exception: " << err_msg << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } std::cout << "[" << name << "] Thread terminato" << std::endl; }; // TX loop con error handling robusto auto tx_loop = [&](uhd::tx_streamer::sptr stream, SafeQueue<std::vector<std::complex<float>>>& queue, std::atomic<uint64_t>& counter, std::atomic<uint64_t>& error_counter, Watchdog& wd, const std::string& name) { std::vector<std::complex<float>> silence(spb, {0.0f, 0.0f}); std::vector<std::complex<float>> data; uhd::tx_metadata_t md; md.start_of_burst = true; md.end_of_burst = false; md.has_time_spec = false; size_t empty_queue_count = 0; size_t consecutive_errors = 0; while (running && !gstop && !usb_error_detected) { wd.touch(); try { if (queue.pop(data, 10)) { empty_queue_count = 0; size_t sent = stream->send(&data.front(), data.size(), md, 0.3); counter += sent; consecutive_errors = 0; if (sent < data.size()) { underrun_count++; } } else { empty_queue_count++; if (empty_queue_count > 5) { stream->send(&silence.front(), silence.size(), md, 0.1); if (empty_queue_count == 6) { underrun_count++; } } } md.start_of_burst = false; } catch (const std::exception& e) { error_counter++; consecutive_errors++; std::string err_msg = e.what(); if (err_msg.find("LIBUSB") != std::string::npos || err_msg.find("NO_DEVICE") != std::string::npos) { std::cerr << "[" << name << "] ⚠️ EXCEPTION USB CRITICA: " << err_msg << std::endl; usb_error_detected = true; break; } if (consecutive_errors > 50) { std::cerr << "[" << name << "] Troppi errori consecutivi (" << consecutive_errors << ")" << std::endl; usb_error_detected = true; break; } if (error_counter % 100 == 1) { std::cerr << "[" << name << "] Errore TX: " << err_msg << std::endl; } std::this_thread::sleep_for(std::chrono::milliseconds(10)); } } // End of burst md.end_of_burst = true; try { stream->send("", 0, md); } catch (...) {} std::cout << "[" << name << "] Thread terminato" << std::endl; }; // Avvia streaming std::cout << "Avvio streaming..." << std::endl; uhd::stream_cmd_t scmd(uhd::stream_cmd_t::STREAM_MODE_START_CONTINUOUS); scmd.stream_now = true; u1rx->issue_stream_cmd(scmd); std::this_thread::sleep_for(std::chrono::milliseconds(50)); u2rx->issue_stream_cmd(scmd); std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Avvia threads std::thread t_rx1([&] { rx_loop(u1rx, q12, rx1_count, rx1_errors, wd_rx1, "RX1"); }); std::thread t_rx2([&] { rx_loop(u2rx, q21, rx2_count, rx2_errors, wd_rx2, "RX2"); }); std::this_thread::sleep_for(std::chrono::milliseconds(200)); std::thread t_tx1([&] { tx_loop(u1tx, q21, tx1_count, tx1_errors, wd_tx1, "TX1"); }); std::thread t_tx2([&] { tx_loop(u2tx, q12, tx2_count, tx2_errors, wd_tx2, "TX2"); }); std::cout << "\n=== RELAY ATTIVO - Ctrl+C per terminare ===\n" << std::endl; // Monitor con watchdog check auto start = std::chrono::steady_clock::now(); auto last_rx1 = rx1_count.load(); auto last_rx2 = rx2_count.load(); while (!gstop && !usb_error_detected) { std::this_thread::sleep_for(std::chrono::seconds(5)); // Check watchdogs std::vector<Watchdog*> watchdogs = {&wd_rx1, &wd_rx2, &wd_tx1, &wd_tx2}; for (auto wd : watchdogs) { if (!wd->is_alive()) { std::cerr << "⚠️ WATCHDOG TIMEOUT: " << wd->get_name() << " non risponde da " << wd->seconds_since_update() << " secondi!" << std::endl; usb_error_detected = true; break; } } if (usb_error_detected) break; auto now = std::chrono::steady_clock::now(); double elapsed = std::chrono::duration<double>(now - start).count(); auto cur_rx1 = rx1_count.load(); auto cur_rx2 = rx2_count.load(); double rate_rx1 = (cur_rx1 - last_rx1) / 5.0 / 1e6; double rate_rx2 = (cur_rx2 - last_rx2) / 5.0 / 1e6; last_rx1 = cur_rx1; last_rx2 = cur_rx2; int64_t mismatch = std::abs((int64_t)cur_rx1 - (int64_t)cur_rx2); std::cout << std::fixed << std::setprecision(2); std::cout << "[" << elapsed/60 << " min] " << "RX: " << rate_rx1 << "/" << rate_rx2 << " Msps | " << "TX: " << tx1_count/1e6 << "/" << tx2_count/1e6 << " M | " << "Q: " << q12.size() << "/" << q21.size() << " (" << q12.get_dropped() << "/" << q21.get_dropped() << " drop) | " << "Err: O" << overflow_count << "/U" << underrun_count << "/RX" << (rx1_errors+rx2_errors) << "/TX" << (tx1_errors+tx2_errors); if (mismatch > 10000) { std::cout << " | ⚠️ MISMATCH: " << mismatch << " samples"; } std::cout << std::endl; } // Shutdown if (usb_error_detected) { std::cerr << "\n⚠️⚠️⚠️ ERRORE USB CRITICO RILEVATO ⚠️⚠️⚠️" << std::endl; std::cerr << "Possibili cause:" << std::endl; std::cerr << "1. USB autosuspend attivo - esegui fix_usb.sh" << std::endl; std::cerr << "2. Cavo/hub USB difettoso" << std::endl; std::cerr << "3. Alimentazione insufficiente" << std::endl; std::cerr << "4. Problemi hardware USRP\n" << std::endl; } std::cout << "Arresto..." << std::endl; running = false; uhd::stream_cmd_t stop_cmd(uhd::stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS); try { u1rx->issue_stream_cmd(stop_cmd); } catch (...) {} try { u2rx->issue_stream_cmd(stop_cmd); } catch (...) {} if (t_rx1.joinable()) t_rx1.join(); if (t_rx2.joinable()) t_rx2.join(); if (t_tx1.joinable()) t_tx1.join(); if (t_tx2.joinable()) t_tx2.join(); // Stats finali std::cout << "\n=== Statistiche Finali ===" << std::endl; std::cout << "RX1: " << rx1_count << " samples (" << rx1_errors << " errori)" << std::endl; std::cout << "RX2: " << rx2_count << " samples (" << rx2_errors << " errori)" << std::endl; std::cout << "TX1: " << tx1_count << " samples (" << tx1_errors << " errori)" << std::endl; std::cout << "TX2: " << tx2_count << " samples (" << tx2_errors << " errori)" << std::endl; std::cout << "Mismatch finale: " << std::abs((int64_t)rx1_count - (int64_t)rx2_count) << " samples" << std::endl; std::cout << "===========================\n" << std::endl; return usb_error_detected ? 2 : 0; }
Créé il y a 2 semaines.