#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "ioUringAssemblyEngine.h" #include "pcloudStimulusProducer.h" #include "livoxGen1.h" // #define REGISTER_IOURING_BUFFERS namespace smo { namespace stim_buff { inline LivoxProto1DllState& getLivoxProto1State() { return livoxProto1; } struct DummyLivoxEthHeader { DummyLivoxEthHeader() : version(0xFF), slot(0xFF), id(0xFF), rsvd(0xFF) {} static bool isDummy(const DummyLivoxEthHeader& hdr) { return hdr.version == 0xFF || hdr.slot == 0xFF || hdr.id == 0xFF || hdr.rsvd == 0xFF; } static bool isValid(const DummyLivoxEthHeader& hdr) { return !isDummy(hdr); } uint8_t version, slot, id, rsvd; uint32_t err_code; uint8_t timestamp_type, data_type; uint8_t timestamp[8]; }; IoUringAssemblyEngine::IoUringAssemblyEngine( PcloudStimulusProducer& parent_, size_t nDgramsPerStagingBufferFrame_) : parent(parent_), frameAssemblyDesc(nullptr), ring{}, eventfdFd(-1), eventfdDesc(nullptr), eventfd_value(0), stallTimer(parent_.device->componentThread->getIoService()), shouldAcceptRequests(false), nDgramsPerStagingBufferFrame(nDgramsPerStagingBufferFrame_), assembledSlotsTracker(nDgramsPerStagingBufferFrame_), randomDevice(), randomGenerator(randomDevice()) {} bool IoUringAssemblyEngine::setup() { // Defensive check to prevent double-calling { SpinLock::Guard lock(shouldAcceptRequestsLock); if (shouldAcceptRequests) { throw std::runtime_error(std::string(__func__) + ": setup() called " "while already set up"); } } // Get FrameAssemblyDesc from staging buffer frameAssemblyDesc = static_cast>( parent.assemblyBuffer); if (!frameAssemblyDesc || frameAssemblyDesc->slots.empty()) { return false; } // Get point cloud data socket descriptor from UdpCommandDemuxer auto& livoxState = getLivoxProto1State(); if (!livoxState.livoxProto1_getPcloudDataFdDesc) { return false; } pcloudDataFdDesc = (*livoxState.livoxProto1_getPcloudDataFdDesc)(); if (!pcloudDataFdDesc) { return false; } // Get UDP socket file descriptor int udpFd = pcloudDataFdDesc->native_handle(); if (udpFd < 0) { return false; } // Set up iovecs for each slot for (size_t i = 0; i < frameAssemblyDesc->numSlots; ++i) { assembledSlotsTracker[i].assembled = false; assembledSlotsTracker[i].msgHdr = {}; assembledSlotsTracker[i].msgHdr.msg_iov = &assembledSlotsTracker[i].ioVec; assembledSlotsTracker[i].msgHdr.msg_iovlen = 1; } for (size_t i = 0; i < frameAssemblyDesc->numSlots; ++i) { const auto& slot = frameAssemblyDesc->slots[i]; assembledSlotsTracker[i].ioVec.iov_base = slot.vaddr; assembledSlotsTracker[i].ioVec.iov_len = slot.nBytes; } // Declare iovec early to avoid goto crossing initialization #ifdef REGISTER_IOURING_BUFFERS struct iovec iov; #endif int ret; /** EXPLANATION: * Initialize io_uring ring - allocate SQEs and CQEs for one frame assembly * One SQE per slot (one datagram per slot), plus one extra for cancel * operations, since io_uring_prep_cancel() requires a valid SQE. So we * alloc 1 extra SQE to guarantee that we will always have an available SQE * for cancel operations. */ ret = io_uring_queue_init( static_cast(frameAssemblyDesc->numSlots + 1), &ring, 0); if (ret < 0) { goto cleanup; } #ifdef REGISTER_IOURING_BUFFERS // Register staging buffer with io_uring for DMA-apt I/O iov = parent.assemblyBuffer.getIoUringRegisterIoVec(); ret = io_uring_register_buffers(&ring, &iov, 1); if (ret < 0) { goto cleanup_ring; } #endif // Create eventfd for CQE notifications (used with boost's unified loop) eventfdFd = eventfd(0, EFD_NONBLOCK); if (eventfdFd < 0) { #ifdef REGISTER_IOURING_BUFFERS goto cleanup_buffers; #else goto cleanup_ring; #endif } // Register eventfd with io_uring ret = io_uring_register_eventfd(&ring, eventfdFd); if (ret < 0) { goto cleanup_eventfd; } shouldAcceptRequests = true; return true; cleanup_eventfd: close(eventfdFd); eventfdFd = -1; #ifdef REGISTER_IOURING_BUFFERS cleanup_buffers: io_uring_unregister_buffers(&ring); #endif cleanup_ring: io_uring_queue_exit(&ring); cleanup: return false; } void IoUringAssemblyEngine::finalize() { bool wasAcceptingRequests = stop(); if (eventfdFd >= 0) { io_uring_unregister_eventfd(&ring); close(eventfdFd); eventfdFd = -1; } if (wasAcceptingRequests) { #ifdef REGISTER_IOURING_BUFFERS io_uring_unregister_buffers(&ring); #endif io_uring_queue_exit(&ring); } // Reset state to allow setup() to be called again frameAssemblyDesc = nullptr; } void IoUringAssemblyEngine::resetAndAssembleFrame( resetAndAssembleFrameCbFn onCqeReady) { if (!onCqeReady) { throw std::runtime_error(std::string(__func__) + ": onCqeReady callback is invalid"); } if (!shouldAcceptRequests) { throw std::runtime_error(std::string(__func__) + ": engine is not accepting requests"); } // eventfdDesc should not be valid when resetAndAssembleFrame is called if (eventfdDesc) { throw std::runtime_error(std::string(__func__) + ": eventfdDesc is already set"); } // Store the callback for re-arming onCqeReadyCallback = std::move(onCqeReady); // Reset all assembled slots tracker to false for (auto& slotDesc : assembledSlotsTracker) { slotDesc.assembled = false; } /** EXPLANATION: * Flush eventfd state: poll and read any pending events before creating * descriptor. * Use poll() to check if data is available (non-blocking check). * If data is available, read it to flush. */ struct pollfd pfd; pfd.fd = eventfdFd; pfd.events = POLLIN; pfd.revents = 0; int poll_ret = poll(&pfd, 1, 0); // Timeout 0 = non-blocking if (poll_ret > 0 && (pfd.revents & POLLIN)) { uint64_t discard; ssize_t ret = read(eventfdFd, &discard, sizeof(discard)); (void)ret; // Ignore errors - just trying to flush } eventfdDesc = std::make_unique( parent.device->componentThread->getIoService(), eventfdFd); if (!eventfdDesc) { throw std::runtime_error(std::string(__func__) + ": failed to create eventfd stream descriptor"); } // Get UDP socket file descriptor int udpFd = pcloudDataFdDesc->native_handle(); if (udpFd < 0) { throw std::runtime_error(std::string(__func__) + ": invalid UDP socket file descriptor"); } // Prepare SQEs for each slot in the frame struct io_uring_sqe *sqe; for (size_t i = 0; i < frameAssemblyDesc->numSlots; ++i) { sqe = io_uring_get_sqe(&ring); if (!sqe) { throw std::runtime_error(std::string(__func__) + ": failed to get SQE for slot " + std::to_string(i)); } io_uring_prep_recvmsg(sqe, udpFd, &assembledSlotsTracker[i].msgHdr, 0); // Set user_data to slot index for tracking io_uring_sqe_set_data(sqe, reinterpret_cast(i)); } // Submit all SQEs int ret = io_uring_submit(&ring); if (ret < 0) { throw std::runtime_error(std::string(__func__) + ": io_uring_submit failed: " + std::strerror(errno) + " (errno=" + std::to_string(errno) + ")"); } // Start listening for CQE notifications on eventfd eventfdDesc->async_read_some( boost::asio::buffer(&eventfd_value, sizeof(eventfd_value)), std::bind( &IoUringAssemblyEngine::onEventfdRead, this, std::placeholders::_1, std::placeholders::_2)); } bool IoUringAssemblyEngine::stop() { // Acquire and release lock tightly around setting the flag SpinLock::Guard lock(shouldAcceptRequestsLock); bool wasAcceptingRequests = shouldAcceptRequests; shouldAcceptRequests = false; return wasAcceptingRequests; } void IoUringAssemblyEngine::assemblyCycleComplete() { // Cancel in-flight stall timeout timer stallTimer.cancel(); onCqeReadyCallback = std::move([](void *, int){}); if (frameAssemblyDesc) { struct io_uring_sqe *sqe = io_uring_get_sqe(&ring); if (!sqe) { std::cerr << __func__ << ": failed to get SQE for cancel op. " << "Continuing cleanup without cancelling.\n"; goto cleanup_eventfd; } /* Cancel all in-flight operations on our ring * using IORING_ASYNC_CANCEL_ANY. Identify the CQE for the cancel * op as numSlots since numSlots is an invalid slot index for a * real slot. */ io_uring_prep_cancel( sqe, reinterpret_cast(frameAssemblyDesc->numSlots), IORING_ASYNC_CANCEL_ANY); io_uring_submit(&ring); /* Wait for cancellation to complete. According to the man page, * cancellation is synchronous and a CQE is guaranteed to be * generated by the time submission returns. */ struct io_uring_cqe *cqe; bool sawCancelCqe = false; while (io_uring_peek_cqe(&ring, &cqe) == 0) { // Call seen() on all CQEs for completeness/correctness. io_uring_cqe_seen(&ring, cqe); void *user_data = io_uring_cqe_get_data(cqe); if (user_data == reinterpret_cast( frameAssemblyDesc->numSlots)) { sawCancelCqe = true; } } if (!sawCancelCqe && smoHooksPtr->OptionParser_getOptions().verbose) { std::cerr << __func__ << ": no CQE seen for cancel operation\n"; } } cleanup_eventfd: if (eventfdDesc) { /** EXPLANATION: * The goal here is to ensure that our io_service's event loop will not * get any events from the eventfd after we've called * assemblyCycleComplete(). So we completely deinitialize the eventfd * descriptor. * * But we still want to reuse the underlying eventfd file descriptor, * itself in the next resetAndAssembleFrame() cycle, so we call * release() instead of reset() to ensure that the underlying fd * is not closed. * * However, we need to close the descriptor's association with the * io_service before releasing it, otherwise Boost.Asio will complain * when we try to create a new descriptor with the same fd. */ /** CAVEAT: * There's a rare but real race condition here where the eventfd gets an * event signaled on it, and while boost is internally processing that * event to enqeue our handler, we call cancel() and release() here. * If boost internally has locking on the stream_descriptor object, * this should be fine. But just in case it doesn't, I'm just * documenting that possibility here. * * There's nothing we can really do about it except know that it would * be very rarely happen; and that we can't do anything about it short * of modifying the boost.Asio code. */ eventfdDesc->cancel(); eventfdDesc->release(); /* Destroy the descriptor object (now that it's unregistered, destroying * it won't close the fd since release() transferred ownership back) */ eventfdDesc.reset(); } } // Continuation class for assembleFrameReq class IoUringAssemblyEngine::AssembleFrameReq : public PostedAsynchronousContinuation< IoUringAssemblyEngine::assembleFrameReqCbFn> { public: AssembleFrameReq( IoUringAssemblyEngine& engine_, const std::shared_ptr& caller, Callback cb) : PostedAsynchronousContinuation< IoUringAssemblyEngine::assembleFrameReqCbFn>(caller, cb), engine(engine_), loop(engine_.frameAssemblyDesc->numSlots), timerFired(false), handlerExecuted(false) {} void callOriginalCallback(bool success, AsynchronousLoop loop) { callOriginalCb(success, loop); } public: void assembleFrameReq1_posted( std::shared_ptr context) { SpinLock::Guard lock(engine.shouldAcceptRequestsLock); if (!engine.shouldAcceptRequests) { context->callOriginalCallback(false, AsynchronousLoop(0)); return; } // Initialize loop with number of slots context->loop = AsynchronousLoop(engine.frameAssemblyDesc->numSlots); // Record assembly start time engine.assemblyStartTime = std::chrono::high_resolution_clock::now(); /** FIXME: * I'm suspicious of this std::bind return object here. What if us * setting it to null inside of stop() doesn't actually cause the * object to be destroyed? This would cause this contin's sh_ptr's * reference count to never reach 0, causing a memory leak. */ engine.resetAndAssembleFrame( std::bind(&AssembleFrameReq::assembleFrameReq2_2, context.get(), context, std::placeholders::_1, std::placeholders::_2)); // Set up timeout timer for CONFIG_STIMBUFF_FRAME_PERIOD_MS/2 ms engine.stallTimer.expires_from_now( boost::posix_time::milliseconds( CONFIG_STIMBUFF_FRAME_PERIOD_MS / 2)); engine.stallTimer.async_wait( std::bind(&AssembleFrameReq::assembleFrameReq2_1, context.get(), context, std::placeholders::_1)); } void assembleFrameReq2_1( std::shared_ptr context, const boost::system::error_code& error) { // Check if timer was cancelled (ignore if operation_aborted) if (error == boost::asio::error::operation_aborted) { return; } /** EXPLANATION: * This lock acquisition here will also cover the call to * assembleFrameReq3 below. Because of that, it means that the * requirement that the lock be held while accessing * the metadata that's destroyed in stop() is satisfied. * * In theory though, we shouldn't need to hold the lock into * assembleFrameReq3 below because that function doesn't really access * any state that's destroyed in stop()? But I'm not sure, and we have * indeed seen a SEGFAULT even in the current code with locking, so * I'm going to hold the lock here for now. */ SpinLock::Guard lock(context->engine.shouldAcceptRequestsLock); if (!context->engine.shouldAcceptRequests) { context->engine.assemblyCycleComplete(); context->loop.setRemainingIterationsToFailure(); context->callOriginalCallback(false, context->loop); return; } // Set timer fired flag context->timerFired.store(true); context->assembleFrameReq3(context); } void assembleFrameReq2_2( std::shared_ptr context, void *user_data, int cqe_result) { // NB: The lock was acquired by onEventFdRead before calling this func if (!context->engine.shouldAcceptRequests) { context->engine.assemblyCycleComplete(); context->loop.setRemainingIterationsToFailure(); context->callOriginalCallback(false, context->loop); return; } // Extract index from user_data and mark slot as assembled if successful size_t index = reinterpret_cast(user_data); bool success = (cqe_result >= 0); if (success && index < context->engine.assembledSlotsTracker.size()) { context->engine.assembledSlotsTracker[index].assembled = true; } // Caller decides success: result >= 0 means success if (context->loop.incrementSuccessOrFailureAndTestForCompletionDueTo( success)) { // Loop is complete, call oracle function context->assembleFrameReq3(context); } } void assembleFrameReq3( std::shared_ptr context ) { /** EXPLANATION: * All branch paths that invoke this unifyig oracle function are * expected to already hold the shouldAcceptRequestsLock before calling * it. */ // Ensure we only execute once using atomic exchange if (context->handlerExecuted.exchange(true)) { return; } // Record assembly end time context->engine.assemblyEndTime = std::chrono::high_resolution_clock::now(); // Cancel the timer, stop the engine and process frame, if any. context->engine.assemblyCycleComplete(); /** EXPLANATION: * Timeout doesn't necessarily mean error. * * If we received zero dgrams from the device, that is indeed an error. * But if we received some dgrams, but not all, that is not an error: * it just means we didn't receive as much data as we would have liked. */ // Error: no slots succeeded - no data received successfully. if (context->loop.nSucceeded.load() == 0) { context->callOriginalCallback(false, context->loop); return; } #if 0 // Artificially create random dummy slots for testing context->engine.randomDummySlotFiller(context->loop); #endif // Fill un-assembled slots with dummy datagrams context->engine.fillUnAssembledSlotsWithDummyDgrams(); #if 0 // Print first 4 bytes of each slot (whether assembled or not) if (context->engine.frameAssemblyDesc) { for (size_t i = 0; i < context->engine.frameAssemblyDesc->numSlots; ++i) { context->engine.printSlotBytes(i, 4); } } #endif if (context->loop.nSucceeded.load() >= context->loop.nTotal) { // Success: all or more slots succeeded if (context->loop.nSucceeded.load() > context->loop.nTotal) { std::cerr << __func__ << ": nSucceeded > nTotal: succ (" << context->loop.nSucceeded.load() << ") > nTotal (" << context->loop.nTotal << ")\n"; } context->callOriginalCallback(true, context->loop); return; } if (context->loop.nSucceeded.load() < context->loop.nTotal) { // Success: some slots succeeded (less than total) // Note: dummy fill for un-assembled slots will be implemented later context->callOriginalCallback(true, context->loop); return; } if (smoHooksPtr->OptionParser_getOptions().verbose) { std::cerr << __func__ << ": Invalid state: nSucceeded (" << context->loop.nSucceeded.load() << ") < nTotal (" << context->loop.nTotal << ")" << std::endl; } context->callOriginalCallback(false, context->loop); return; } public: IoUringAssemblyEngine& engine; AsynchronousLoop loop; std::atomic timerFired; std::atomic handlerExecuted; }; void IoUringAssemblyEngine::assembleFrameReq( Callback cb) { { SpinLock::Guard lock(shouldAcceptRequestsLock); if (!shouldAcceptRequests) { cb.callbackFn(false, AsynchronousLoop(0)); return; } } const auto& caller = smoHooksPtr->ComponentThread_getSelf(); auto request = std::make_shared( *this, caller, std::move(cb)); parent.device->componentThread->getIoService().post( STC(std::bind( &AssembleFrameReq::assembleFrameReq1_posted, request.get(), request))); } void IoUringAssemblyEngine::onEventfdRead( const boost::system::error_code& error, std::size_t bytes_transferred) { (void)bytes_transferred; // Ignore cancellation errors if (error == boost::asio::error::operation_aborted) { return; } /** EXPLANATION: * This lock should be held throughout this method to ensure that the * IoUringAssemblyEngine's per-assembly state isn't destroyed while this * handler is running. */ SpinLock::Guard lock(shouldAcceptRequestsLock); /** EXPLANATION: * You'd think we should put check for shouldAcceptRequests here and * `return` here if !shouldAcceptRequests, but we shouldn't because * that would mean that we can't invoke the caller's callback. This would * make the caller freeze forever. * * Instead we just let the onCqeReadyCallback check for * shouldAcceptRequests. That way the onCqeReadyCallback can actually * invoke the caller's callback, as it should. We have no knowledge of the * caller's callback because we don't have access to the caller's * continuation object. The onCqeReadyCallback does have access to it, * so we leave that up to it. */ /** FIXME: * It may be necessary to specifically check for and handle the cancel op * CQE here. I'm not sure as yet though, but I'll highlight it here for now. */ // Process all available CQEs and call callback for each one struct io_uring_cqe *cqe; while (io_uring_peek_cqe(&ring, &cqe) == 0) { // Get user_data from the CQE void* user_data = io_uring_cqe_get_data(cqe); // Get result from the CQE int cqe_result = cqe->res; // Mark the CQE as seen io_uring_cqe_seen(&ring, cqe); /** EXPLANATION: * Call the user-provided callback for this CQE with its user_data and * result. * * 1. Notice that we call the caller's cb *after* marking the CQE as * seen. We may later need to change this if the caller needs * information from the CQE before it is marked as seen. * * 2. Notice that we do not check for or filter out the cancel op CQE * here. The caller's handler will be able to see the cancel op CQE * because of this. */ if (onCqeReadyCallback) { onCqeReadyCallback(user_data, cqe_result); } } /** EXPLANATION: * But we do put a `return` here because we know that at this point, the * caller's callback has already been invoked. */ if (!shouldAcceptRequests || eventfdDesc == nullptr || !eventfdDesc->is_open()) { return; } // Re-arm the eventfd read for next CQE notification eventfdDesc->async_read_some( boost::asio::buffer(&eventfd_value, sizeof(eventfd_value)), std::bind( &IoUringAssemblyEngine::onEventfdRead, this, std::placeholders::_1, std::placeholders::_2)); } void IoUringAssemblyEngine::fillUnAssembledSlotsWithDummyDgrams() { if (!frameAssemblyDesc) { return; } for (size_t i = 0; i < frameAssemblyDesc->numSlots; ++i) { // Only fill slots that were not successfully assembled if (i >= assembledSlotsTracker.size() || assembledSlotsTracker[i].assembled) { continue; } // Placement construct DummyLivoxEthHeader in the slot new (frameAssemblyDesc->slots[i].vaddr) DummyLivoxEthHeader(); } } void IoUringAssemblyEngine::randomDummySlotFiller(AsynchronousLoop& loop) { if (!frameAssemblyDesc) { return; } // Check if there are already dummies (natural dummy instance) uint32_t nSucceeded = loop.nSucceeded.load(); uint32_t nTotal = loop.nTotal; uint32_t nFailed = loop.nFailed.load(); if (nFailed > 0 || nSucceeded < nTotal) { std::cout << __func__ << ": Natural dummy instance detected (nSucceeded=" << nSucceeded << ", nTotal=" << nTotal << ", nFailed=" << nFailed << "), skipping artificial dummy creation" << std::endl; return; } // Randomly select a number of slots to make into dummies (less than total) std::uniform_int_distribution numDummiesDist(1, nTotal - 1); size_t numDummiesToCreate = numDummiesDist(randomGenerator); std::uniform_int_distribution slotIndexDist(0, nTotal - 1); size_t dummiesCreated = 0; size_t maxAttempts = nTotal * 10; // Safety limit to prevent infinite loop size_t attempts = 0; // Mark random slots as unassembled while (dummiesCreated < numDummiesToCreate && attempts < maxAttempts) { ++attempts; size_t randomIndex = slotIndexDist(randomGenerator); // Skip if already unassembled, re-roll if (randomIndex >= assembledSlotsTracker.size() || !assembledSlotsTracker[randomIndex].assembled) { continue; } // Mark as unassembled assembledSlotsTracker[randomIndex].assembled = false; ++dummiesCreated; } if (dummiesCreated < numDummiesToCreate) { std::cerr << __func__ << ": Warning: Could only create " << dummiesCreated << " dummy slots out of " << numDummiesToCreate << " requested (max attempts reached)" << std::endl; numDummiesToCreate = dummiesCreated; } // Update the AsynchronousLoop to reflect the new number of dummies // Since we only reach here when nSucceeded == nTotal and nFailed == 0, // we can directly calculate the new values uint32_t newSucceeded = nTotal - static_cast(numDummiesToCreate); uint32_t newFailed = static_cast(numDummiesToCreate); loop.nSucceeded.store(newSucceeded); loop.nFailed.store(newFailed); std::cout << __func__ << ": Artificially created " << numDummiesToCreate << " dummy slots (nSucceeded: " << nTotal << " -> " << newSucceeded << ", nFailed: 0 -> " << newFailed << ")" << std::endl; } void IoUringAssemblyEngine::printSlotBytes(size_t slotIndex, size_t nBytes) { if (!frameAssemblyDesc) { std::cerr << __func__ << ": frameAssemblyDesc is null" << std::endl; return; } if (slotIndex >= frameAssemblyDesc->numSlots) { std::cerr << __func__ << ": slotIndex " << slotIndex << " out of range (numSlots=" << frameAssemblyDesc->numSlots << ")" << std::endl; return; } const auto& slot = frameAssemblyDesc->slots[slotIndex]; size_t bytesToPrint = std::min(nBytes, static_cast(slot.nBytes)); const uint8_t* data = reinterpret_cast(slot.vaddr); std::cout << __func__ << ": Slot " << slotIndex << " vaddr=" << (void*)slot.vaddr << " (" << bytesToPrint << " bytes):" << std::endl; // Print hex dump format: offset | hex bytes | ASCII const size_t bytesPerLine = 16; for (size_t offset = 0; offset < bytesToPrint; offset += bytesPerLine) { // Print offset std::cout << std::hex << std::setfill('0') << std::setw(4) << offset << ": "; // Print hex bytes for (size_t i = 0; i < bytesPerLine; ++i) { if (offset + i < bytesToPrint) { std::cout << std::setw(2) << static_cast(data[offset + i]) << " "; } else { std::cout << " "; } } // Print ASCII representation std::cout << " |"; for (size_t i = 0; i < bytesPerLine && offset + i < bytesToPrint; ++i) { uint8_t byte = data[offset + i]; char c = (byte >= 32 && byte < 127) ? static_cast(byte) : '.'; std::cout << c; } std::cout << "|" << std::dec << std::endl; } } std::chrono::milliseconds IoUringAssemblyEngine::getAssemblyDuration() const { auto duration = assemblyEndTime - assemblyStartTime; if (duration.count() < 0) { return std::chrono::milliseconds(0); } return std::chrono::duration_cast(duration); } } // namespace stim_buff } // namespace smo