Skip to content

Commit

Permalink
Large Observes: Prevent server sending new response if active response
Browse files Browse the repository at this point in the history
If the response is large, it can take time to transfer over the body.  If
the server then initiates a new unsolicited response during the existing
transfer, then overflow / retry issues start to occur, compounded if multiple
unsolicited responses are triggerred during an existing transfer.

Hold back the notify of the new unsolicited response(s) until the
existing response has completed (or failed for another reason).

Other code has been tightened up incase the server or client are
not libcoap based and also exhibit this behavior, so to have a better chance
of recovery.
  • Loading branch information
mrdeep1 authored and obgm committed Jun 17, 2022
1 parent 5a10ce4 commit 20f15a1
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 20 deletions.
3 changes: 2 additions & 1 deletion include/coap3/coap_block_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ struct coap_lg_xmit_t {
} b;
coap_pdu_t pdu; /**< skeletal PDU */
coap_tick_t last_payload; /**< Last time MAX_PAYLOAD was sent or 0 */
coap_tick_t last_used; /**< Last time all data sent or 0 */
coap_tick_t last_all_sent; /**< Last time all data sent or 0 */
coap_tick_t last_obs; /**< Last time used (Observe tracking) or 0 */
coap_release_large_data_t release_func; /**< large data de-alloc function */
void *app_ptr; /**< applicaton provided ptr for de-alloc function */
};
Expand Down
48 changes: 29 additions & 19 deletions src/block.c
Original file line number Diff line number Diff line change
Expand Up @@ -574,16 +574,15 @@ coap_add_data_large_internal(coap_session_t *session,
coap_log(LOG_DEBUG, "** %s: lg_xmit %p initialized\n",
coap_session_str(session), (void*)lg_xmit);
/* Update lg_xmit with large data information */
memset(lg_xmit, 0, sizeof(coap_lg_xmit_t));
lg_xmit->blk_size = blk_size;
lg_xmit->option = option;
lg_xmit->last_block = 0;
lg_xmit->data = data;
lg_xmit->length = length;
lg_xmit->offset = 0;
lg_xmit->last_payload = 0;
lg_xmit->last_used = 0;
lg_xmit->release_func = release_func;
lg_xmit->app_ptr = app_ptr;
coap_ticks(&lg_xmit->last_obs);
if (COAP_PDU_IS_REQUEST(pdu)) {
/* Need to keep original token for updating response PDUs */
lg_xmit->b.b1.app_token = coap_new_binary(pdu->token_length);
Expand Down Expand Up @@ -858,18 +857,18 @@ coap_block_check_lg_xmit_timeouts(coap_session_t *session, coap_tick_t now) {
coap_tick_t tim_rem = -1;

LL_FOREACH_SAFE(session->lg_xmit, p, q) {
if (p->last_used == 0 || p->option == COAP_OPTION_BLOCK2) {
if (p->last_all_sent == 0 || p->option == COAP_OPTION_BLOCK2) {
continue;
}
if (p->last_used + idle_timeout <= now) {
if (p->last_all_sent + idle_timeout <= now) {
/* Expire this entry */
LL_DELETE(session->lg_xmit, p);
coap_block_delete_lg_xmit(session, p);
}
else {
/* Delay until the lg_xmit needs to expire */
if (tim_rem > p->last_used + idle_timeout - now)
tim_rem = p->last_used + idle_timeout - now;
if (tim_rem > p->last_all_sent + idle_timeout - now)
tim_rem = p->last_all_sent + idle_timeout - now;
}
}
return tim_rem;
Expand Down Expand Up @@ -1121,6 +1120,10 @@ coap_handle_request_send_block(coap_session_t *session,

if (!coap_get_block_b(session, pdu, COAP_OPTION_BLOCK2, &block))
return 0;
if (block.num == 0) {
/* Get a fresh copy of the data */
return 0;
}
block_opt = COAP_OPTION_BLOCK2;
LL_FOREACH(session->lg_xmit, p) {
size_t chunk;
Expand All @@ -1139,7 +1142,7 @@ coap_handle_request_send_block(coap_session_t *session,
/* try out the next one */
continue;
}
p->last_used = 0;
p->last_all_sent = 0;
etag_opt = coap_check_option(pdu, COAP_OPTION_ETAG, &opt_iter);
if (etag_opt) {
uint64_t etag = coap_decode_var_bytes8(coap_opt_value(etag_opt),
Expand All @@ -1156,6 +1159,7 @@ coap_handle_request_send_block(coap_session_t *session,
}

/* lg_xmit (response) found */
coap_ticks(&p->last_obs);

chunk = (size_t)1 << (p->blk_size + 4);
if (block_opt) {
Expand Down Expand Up @@ -1231,8 +1235,7 @@ coap_handle_request_send_block(coap_session_t *session,
coap_opt_filter_t drop_options;

memset(&drop_options, 0, sizeof(coap_opt_filter_t));
if (block.num != 0)
coap_option_filter_set(&drop_options, COAP_OPTION_OBSERVE);
coap_option_filter_set(&drop_options, COAP_OPTION_OBSERVE);
out_pdu = coap_pdu_duplicate(&p->pdu, session, pdu->token_length,
pdu->token, &drop_options);
if (!out_pdu) {
Expand All @@ -1247,11 +1250,13 @@ coap_handle_request_send_block(coap_session_t *session,
*/
coap_option_iterator_init(&p->pdu, &opt_iter, COAP_OPT_ALL);
while ((option = coap_option_next(&opt_iter))) {
if (opt_iter.number == COAP_OPTION_OBSERVE && block.num != 0)
if (opt_iter.number == COAP_OPTION_OBSERVE)
continue;
if (!coap_add_option_internal(response, opt_iter.number,
coap_opt_length(option),
coap_opt_value(option))) {
if (opt_iter.number == p->option)
continue;
if (!coap_insert_option(response, opt_iter.number,
coap_opt_length(option),
coap_opt_value(option))) {
goto internal_issue;
}
}
Expand All @@ -1276,7 +1281,7 @@ coap_handle_request_send_block(coap_session_t *session,
}
if (!(p->offset + chunk < p->length)) {
/* Last block - keep in cache for 4 * ACK_TIMOUT */
coap_ticks(&p->last_used);
coap_ticks(&p->last_all_sent);
}
if (p->b.b2.maxage_expire) {
coap_tick_t now;
Expand All @@ -1290,7 +1295,7 @@ coap_handle_request_send_block(coap_session_t *session,
else {
rem = 0;
/* Entry needs to be expired */
coap_ticks(&p->last_used);
coap_ticks(&p->last_all_sent);
}
if (!coap_update_option(out_pdu, COAP_OPTION_MAXAGE,
coap_encode_var_safe8(buf,
Expand Down Expand Up @@ -1321,7 +1326,7 @@ coap_handle_request_send_block(coap_session_t *session,
(const uint8_t *)error_phrase);
/* Keep in cache for 4 * ACK_TIMOUT */
if (p)
coap_ticks(&p->last_used);
coap_ticks(&p->last_all_sent);
goto skip_app_handler;
} /* end of LL_FOREACH() */
return 0;
Expand Down Expand Up @@ -1936,6 +1941,7 @@ coap_handle_response_get_block(coap_context_t *context,
uint64_t token = STATE_TOKEN_FULL(p->state_token,
++p->retry_counter);
size_t len = coap_encode_var_safe8(buf, sizeof(token), token);
coap_opt_filter_t drop_options;

coap_log(LOG_WARNING,
"Data body updated during receipt - new request started\n");
Expand All @@ -1946,7 +1952,10 @@ coap_handle_response_get_block(coap_context_t *context,
coap_free_type(COAP_STRING, p->body_data);
p->body_data = NULL;

pdu = coap_pdu_duplicate(&p->pdu, session, len, buf, NULL);
coap_session_new_token(session, &len, buf);
memset(&drop_options, 0, sizeof(coap_opt_filter_t));
coap_option_filter_set(&drop_options, COAP_OPTION_OBSERVE);
pdu = coap_pdu_duplicate(&p->pdu, session, len, buf, &drop_options);
if (!pdu)
goto fail_resp;

Expand Down Expand Up @@ -2146,6 +2155,7 @@ coap_handle_response_get_block(coap_context_t *context,
if (coap_get_block_b(session, rcvd, COAP_OPTION_BLOCK2, &block)) {
coap_log(LOG_DEBUG, "** %s: large body receive internal issue\n",
coap_session_str(session));
goto skip_app_handler;
}
}
else if (COAP_RESPONSE_CLASS(rcvd->code) == 2) {
Expand All @@ -2161,7 +2171,7 @@ coap_handle_response_get_block(coap_context_t *context,
coap_get_data(rcvd, &length, &data);
rcvd->body_offset = block.num*chunk;
rcvd->body_total = block.num*chunk + length + (block.m ? 1 : 0);
return 0;
goto call_app_handler;
}
}
if (have_block) {
Expand Down
21 changes: 21 additions & 0 deletions src/pdu.c
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,27 @@ coap_add_option_internal(coap_pdu_t *pdu, coap_option_num_t number, size_t len,
case COAP_OPTION_URI_QUERY:
case COAP_OPTION_LOCATION_QUERY:
break;
/* Protest at the known non-repeatable options and ignore them */
case COAP_OPTION_URI_HOST:
case COAP_OPTION_IF_NONE_MATCH:
case COAP_OPTION_OBSERVE:
case COAP_OPTION_URI_PORT:
case COAP_OPTION_OSCORE:
case COAP_OPTION_CONTENT_FORMAT:
case COAP_OPTION_MAXAGE:
case COAP_OPTION_HOP_LIMIT:
case COAP_OPTION_ACCEPT:
case COAP_OPTION_BLOCK2:
case COAP_OPTION_BLOCK1:
case COAP_OPTION_SIZE2:
case COAP_OPTION_PROXY_URI:
case COAP_OPTION_PROXY_SCHEME:
case COAP_OPTION_SIZE1:
case COAP_OPTION_NORESPONSE:
coap_log(LOG_INFO,
"Option number %d is not defined as repeatable - dropped\n",
number);
return 0;
default:
coap_log(LOG_INFO, "Option number %d is not defined as repeatable\n",
number);
Expand Down
12 changes: 12 additions & 0 deletions src/resource.c
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,7 @@ coap_notify_observers(coap_context_t *context, coap_resource_t *r,
uint8_t buf[4];
coap_string_t *query;
coap_block_b_t block;
coap_tick_t now;

if (r->observable && (r->dirty || r->partiallydirty)) {
r->partiallydirty = 0;
Expand All @@ -906,6 +907,17 @@ coap_notify_observers(coap_context_t *context, coap_resource_t *r,
if (obs->session->con_active >= COAP_NSTART(obs->session) &&
((r->flags & COAP_RESOURCE_FLAGS_NOTIFY_CON) ||
(obs->non_cnt >= COAP_OBS_MAX_NON))) {
/* Waiting for the previous unsolicited response to finish */
r->partiallydirty = 1;
obs->dirty = 1;
context->observe_pending = 1;
continue;
}
coap_ticks(&now);
if (obs->session->lg_xmit && obs->session->lg_xmit->last_all_sent == 0 &&
obs->session->lg_xmit->last_obs &&
(obs->session->lg_xmit->last_obs + 2*COAP_TICKS_PER_SECOND) > now) {
/* Waiting for the previous blocked unsolicited response to finish */
r->partiallydirty = 1;
obs->dirty = 1;
context->observe_pending = 1;
Expand Down

0 comments on commit 20f15a1

Please sign in to comment.