18
18
* along with Elliptics. If not, see <http://www.gnu.org/licenses/>.
19
19
*/
20
20
21
- #include < cocaine/storage.hpp>
21
+ #include " cocaine/storage.hpp"
22
+
23
+ #include " cocaine/elliptics_logger.hpp"
22
24
23
25
#include < cocaine/context.hpp>
24
26
#include < cocaine/dynamic.hpp>
27
29
#include < blackhole/formatter/string.hpp>
28
30
#include < blackhole/v1/attribute.hpp>
29
31
#include < blackhole/v1/logger.hpp>
32
+ #include < cocaine/storage.hpp>
30
33
31
34
namespace cocaine { namespace storage {
32
35
@@ -37,46 +40,6 @@ using namespace cocaine::logging;
37
40
using namespace cocaine ::storage;
38
41
namespace ell = ioremap::elliptics;
39
42
40
- auto map_severity (blackhole::severity_t severity) -> blackhole::severity_t {
41
- if (severity) {
42
- severity -= 1 ;
43
- }
44
- return severity;
45
- }
46
-
47
-
48
- class frontend_t : public blackhole ::base_frontend_t {
49
- public:
50
- frontend_t (std::shared_ptr<logging::logger_t > log, ell::log_level severity)
51
- : log(std::move(log))
52
- , severity(severity)
53
- , formatter(" %(message)s %(...::)s" ) {}
54
-
55
- virtual void handle (const blackhole::log::record_t & record) {
56
- const auto level =
57
- record.extract <dnet_log_level>(blackhole::keyword::severity<dnet_log_level>().name ());
58
-
59
- if (level < severity) {
60
- return ;
61
- }
62
-
63
- const auto mapped = convert (level);
64
-
65
- log ->log (static_cast <int >(mapped), formatter.format (record));
66
- }
67
-
68
- private:
69
- std::shared_ptr<logging::logger_t > log;
70
- ell::log_level severity;
71
- blackhole::formatter::string_t formatter;
72
- };
73
-
74
- log_adapter_t::log_adapter_t (std::shared_ptr<logging::logger_t > log, ell::log_level level)
75
- : ell::logger_base() {
76
- verbosity (DNET_LOG_DEBUG);
77
- add_frontend (std::unique_ptr<frontend_t >(new frontend_t (log , level)));
78
- }
79
-
80
43
namespace {
81
44
82
45
dnet_config parse_json_config (const dynamic_t ::object_t & args) {
@@ -94,16 +57,18 @@ dnet_config parse_json_config(const dynamic_t::object_t& args) {
94
57
95
58
}
96
59
60
+ std::unique_ptr<logging::logger_t > elliptics_storage_t::make_elliptics_logger (const std::string& name) const {
61
+ auto logger_name = format (" storage/{}/elliptics_client" , name);
62
+ return std::unique_ptr<logging::logger_t >(new elliptics_logger_t (m_context.log (logger_name)));
63
+ }
64
+
97
65
elliptics_storage_t::elliptics_storage_t (context_t &context, const std::string &name, const dynamic_t &args)
98
66
: category_type(context, name, args)
99
67
, m_context(context)
100
- , m_log(context.log(name)) // TODO: It was with attributes: {{"storage", "elliptics"}}.
101
- , // XXX: dynamic_t from cocaine can't convert int to uint, and DNET_LOG_INFO being an enum value is int
102
- m_log_adapter (m_log, static_cast <ioremap::elliptics::log_level>(
103
- args.as_object().at(" verbosity" , uint (DNET_LOG_INFO)).as_uint()))
68
+ , m_log(context.log(name, {{" storage" , " elliptics" }}))
104
69
, m_read_latest(args.as_object().at(" read_latest" , false ).as_bool())
105
70
, m_config(parse_json_config(args.as_object()))
106
- , m_node(ell::logger(m_log_adapter, blackhole::log:: attributes_t {{ " storage " , { " elliptics " }}} ), m_config)
71
+ , m_node(make_elliptics_logger(name ), m_config)
107
72
, m_session(m_node) {
108
73
dynamic_t ::array_t nodes = args.as_object ().at (" nodes" ).as_array ();
109
74
@@ -224,29 +189,31 @@ void elliptics_storage_t::remove(const std::string &collection, const std::strin
224
189
});
225
190
}
226
191
227
- ell::async_read_result elliptics_storage_t::async_read (const std::string &collection, const std::string &key)
228
- {
229
- using namespace std ::placeholders;
230
-
231
- COCAINE_LOG_DEBUG (m_log, " reading the '{}' object, collection: '{}'" , key, collection);
232
-
192
+ ell::session elliptics_storage_t::prepare_session (const std::string collection, int timeout) {
233
193
ell::session session = m_session.clone ();
234
194
session.set_namespace (collection.data (), collection.size ());
235
- session.set_timeout (m_timeouts.read );
195
+ session.set_timeout (timeout);
196
+ auto trace = trace_t::current ();
197
+ if (!trace.empty ()) {
198
+ session.set_trace_id (trace.get_trace_id ());
199
+ }
200
+ if (trace.verbose ()) {
201
+ session.set_trace_bit (true );
202
+ }
203
+ return session;
204
+ }
236
205
206
+ ell::async_read_result elliptics_storage_t::async_read (const std::string &collection, const std::string &key)
207
+ {
208
+ COCAINE_LOG_DEBUG (m_log, " reading the '{}' object, collection: '{}'" , key, collection);
209
+ ell::session session = prepare_session (collection, m_timeouts.read );
237
210
return session.read_data (key, 0 , 0 );
238
211
}
239
212
240
213
ell::async_read_result elliptics_storage_t::async_read_latest (const std::string &collection, const std::string &key)
241
214
{
242
- using namespace std ::placeholders;
243
-
244
215
COCAINE_LOG_DEBUG (m_log, " reading the '{}' object, collection: '{}'" , key, collection);
245
-
246
- ell::session session = m_session.clone ();
247
- session.set_namespace (collection.data (), collection.size ());
248
- session.set_timeout (m_timeouts.read );
249
-
216
+ ell::session session = prepare_session (collection, m_timeouts.read );
250
217
return session.read_latest (key, 0 , 0 );
251
218
}
252
219
@@ -258,38 +225,27 @@ ell::async_write_result elliptics_storage_t::async_write(const std::string &coll
258
225
259
226
COCAINE_LOG_DEBUG (m_log, " writing the '{}' object, collection: '{}'" , key, collection);
260
227
261
- ell::session session = m_session.clone ();
262
- session.set_namespace (collection.data (), collection.size ());
228
+ ell::session session = prepare_session (collection, m_timeouts.write );
263
229
session.set_filter (ioremap::elliptics::filters::all_with_ack);
264
- session.set_timeout (m_timeouts.write );
265
230
session.set_checker (m_success_copies_num);
266
-
267
231
return session.write_data (key, blob, 0 );
268
232
}
269
233
270
234
ell::async_remove_result elliptics_storage_t::async_remove (const std::string &collection, const std::string &key)
271
235
{
272
- using namespace std ::placeholders;
273
-
274
236
COCAINE_LOG_DEBUG (m_log, " removing the '{}' object, collection: '{}'" , key, collection);
275
237
276
- ell::session session = m_session.clone ();
277
- session.set_namespace (collection.data (), collection.size ());
278
- session.set_timeout (m_timeouts.remove );
238
+ ell::session session = prepare_session (collection, m_timeouts.remove );
279
239
session.set_checker (m_success_copies_num);
280
-
281
240
return session.remove (key);
282
241
}
283
242
284
243
ioremap::elliptics::async_read_result elliptics_storage_t::async_cache_read (const std::string &collection, const std::string &key)
285
244
{
286
245
COCAINE_LOG_DEBUG (m_log, " cache reading the '{}' object, collection: '{}'" , key, collection);
287
246
288
- ell::session session = m_session.clone ();
289
- session.set_namespace (collection.data (), collection.size ());
247
+ ell::session session = prepare_session (collection, m_timeouts.read );
290
248
session.set_ioflags (DNET_IO_FLAGS_CACHE | DNET_IO_FLAGS_CACHE_ONLY);
291
- session.set_timeout (m_timeouts.read );
292
-
293
249
return session.read_data (key, 0 , 0 );
294
250
}
295
251
@@ -298,12 +254,9 @@ ioremap::elliptics::async_write_result elliptics_storage_t::async_cache_write(co
298
254
{
299
255
COCAINE_LOG_DEBUG (m_log, " cache writing the '{}' object, collection: '{}'" , key, collection);
300
256
301
- ell::session session = m_session.clone ();
302
- session.set_namespace (collection.data (), collection.size ());
257
+ ell::session session = prepare_session (collection, m_timeouts.write );
303
258
session.set_ioflags (DNET_IO_FLAGS_CACHE | DNET_IO_FLAGS_CACHE_ONLY);
304
- session.set_timeout (m_timeouts.write );
305
259
session.set_checker (m_success_copies_num);
306
-
307
260
return session.write_cache (key, blob, timeout);
308
261
}
309
262
@@ -312,9 +265,7 @@ std::pair<ioremap::elliptics::async_read_result, elliptics_storage_t::key_name_m
312
265
{
313
266
COCAINE_LOG_DEBUG (m_log, " bulk reading, collection: '{}'" , collection);
314
267
315
- ell::session session = m_session.clone ();
316
- session.set_namespace (collection.data (), collection.size ());
317
- session.set_timeout (m_timeouts.read );
268
+ ell::session session = prepare_session (collection, m_timeouts.read );
318
269
319
270
key_name_map keys_map;
320
271
dnet_raw_id id;
@@ -332,10 +283,8 @@ ioremap::elliptics::async_write_result elliptics_storage_t::async_bulk_write(con
332
283
{
333
284
COCAINE_LOG_DEBUG (m_log, " bulk writing, collection: '{}'" , collection);
334
285
335
- ell::session session = m_session.clone ();
336
- session.set_namespace (collection.data (), collection.size ());
286
+ ell::session session = prepare_session (collection, m_timeouts.write );
337
287
session.set_filter (ell::filters::all);
338
- session.set_timeout (m_timeouts.write );
339
288
session.set_checker (m_success_copies_num);
340
289
341
290
std::vector<dnet_io_attr> ios;
0 commit comments